Merge pull request #6 from 30x/XAPID-586
Xapid 586
diff --git a/README.md b/README.md
index c5bf99a..168daed 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# apidVerifyAPIKey
+# apidGatewayDeploy
This core plugin for [apid](http://github.com/30x/apid) responds to
[apidApigeeSync](https://github.com/30x/apidApigeeSync) events and publishes an API that allows clients to
@@ -13,6 +13,26 @@
See [apidGatewayDeploy-api.yaml]() for full spec.
+## Configuration
+
+#### gatewaydeploy_debounce_duration
+Window of time during which deployment changes are gathered before sending to client.
+Default: "bundles"
+
+#### gatewaydeploy_bundle_cleanup_delay
+Duration between deleting a deployment and deleting it's bundles on disk.
+Default: "1s"
+
+#### gatewaydeploy_bundle_download_timeout
+Duration before bundle download marks deployment as failed (will continue retries regardless).
+Default: "1m"
+
+#### gatewaydeploy_bundle_dir
+Relative location from local_storage_path in which to store local bundle files.
+Default: "5m"
+
+(durations note, see: https://golang.org/pkg/time/#ParseDuration)
+
## Building and running standalone
First, install prerequisites:
diff --git a/api.go b/api.go
index 8a54a31..fec8239 100644
--- a/api.go
+++ b/api.go
@@ -6,7 +6,10 @@
"fmt"
"io/ioutil"
"net/http"
+ "net/url"
"strconv"
+ "sync"
+ "sync/atomic"
"time"
)
@@ -21,6 +24,7 @@
var (
deploymentsChanged = make(chan string)
addSubscriber = make(chan chan string)
+ eTag int64
)
type errorResponse struct {
@@ -58,7 +62,7 @@
func InitAPI() {
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
- services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST")
+ services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -82,25 +86,45 @@
func distributeEvents() {
subscribers := make(map[chan string]struct{})
- for {
+ mut := sync.Mutex{}
+ msg := ""
+ debouncer := func() {
select {
- case msg := <-deploymentsChanged:
- // todo: add a debounce w/ timeout to avoid sending on every single deployment?
+ case <-time.After(debounceDuration):
+ mut.Lock()
subs := subscribers
- incrementETag() // todo: do this elsewhere? check error?
subscribers = make(map[chan string]struct{})
- log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
+ m := msg
+ msg = ""
+ incrementETag()
+ mut.Unlock()
+ log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
for subscriber := range subs {
select {
- case subscriber <- msg:
- log.Debugf("Handling deploy response for: %s", msg)
+ case subscriber <- m:
+ log.Debugf("Handling deploy response for: %s", m)
+ log.Debugf("delivering TO: %v", subscriber)
default:
log.Debugf("listener too far behind, message dropped")
}
}
+ }
+ }
+ for {
+ select {
+ case newMsg := <-deploymentsChanged:
+ mut.Lock()
+ log.Debug("deploymentsChanged")
+ if msg == "" {
+ go debouncer()
+ }
+ msg = newMsg
+ mut.Unlock()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
+ mut.Lock()
subscribers[subscriber] = struct{}{}
+ mut.Unlock()
}
}
}
@@ -132,11 +156,7 @@
log.Debugf("if-none-match: %s", ifNoneMatch)
// send unmodified if matches prior eTag and no timeout
- eTag, err := getETag()
- if err != nil {
- writeDatabaseError(w)
- return
- }
+ eTag := getETag()
if eTag == ifNoneMatch && timeout == 0 {
w.WriteHeader(http.StatusNotModified)
return
@@ -189,16 +209,16 @@
for _, d := range dataDeps {
apiDeps = append(apiDeps, ApiDeployment{
- ID: d.ID,
- ScopeId: d.DataScopeID,
- Created: d.Created,
- CreatedBy: d.CreatedBy,
- Updated: d.Updated,
- UpdatedBy: d.UpdatedBy,
- BundleConfigJson: []byte(d.BundleConfigJSON),
- ConfigJson: []byte(d.ConfigJSON),
- DisplayName: d.BundleName,
- URI: d.LocalBundleURI,
+ ID: d.ID,
+ ScopeId: d.DataScopeID,
+ Created: d.Created,
+ CreatedBy: d.CreatedBy,
+ Updated: d.Updated,
+ UpdatedBy: d.UpdatedBy,
+ BundleConfigJson: []byte(d.BundleConfigJSON),
+ ConfigJson: []byte(d.ConfigJSON),
+ DisplayName: d.BundleName,
+ URI: d.LocalBundleURI,
})
}
@@ -228,35 +248,93 @@
// validate the results
// todo: these errors to the client should be standardized
var errs bytes.Buffer
- for i, rsp := range results {
- if rsp.ID == "" {
+ var validResults apiDeploymentResults
+ for i, result := range results {
+ valid := true
+ if result.ID == "" {
errs.WriteString(fmt.Sprintf("Missing id at %d\n", i))
}
- if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL {
- errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
+ if result.Status != RESPONSE_STATUS_SUCCESS && result.Status != RESPONSE_STATUS_FAIL {
+ errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n",
+ RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
}
- if rsp.Status == RESPONSE_STATUS_FAIL {
- if rsp.ErrorCode == 0 {
+ if result.Status == RESPONSE_STATUS_FAIL {
+ if result.ErrorCode == 0 {
errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i))
}
- if rsp.Message == "" {
+ if result.Message == "" {
errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i))
}
}
+
+ if valid {
+ validResults = append(validResults, result)
+ }
}
+
if errs.Len() > 0 {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, errs.String())
+ return
}
- err = setDeploymentResults(results)
+ if len(validResults) > 0 {
+ go transmitDeploymentResultsToServer(validResults)
+ setDeploymentResults(validResults)
+ }
+
+ w.Write([]byte("OK"))
+}
+
+func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error {
+
+ retryIn := bundleRetryDelay
+ maxBackOff := 5 * time.Minute
+ backOffFunc := createBackoff(retryIn, maxBackOff)
+
+ uri, err := url.Parse(apiServerBaseURI.String())
if err != nil {
- writeDatabaseError(w)
+ log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err)
+ return err
+ }
+ uri.Path = fmt.Sprintf("/clusters/%s/apids/%s/deployments", apidClusterID, apidInstanceID)
+
+ resultJSON, err := json.Marshal(validResults)
+ if err != nil {
+ log.Errorf("unable to marshal deployment results %v: %v", validResults, err)
+ return err
}
- // todo: transmit to server (API TBD)
- //err = transmitDeploymentResultsToServer()
+ for {
+ log.Debugf("transmitting deployment results to tracker: %s", string(resultJSON))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(resultJSON))
+ req.Header.Add("Content-Type", "application/json")
- return
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
+ if err != nil {
+ log.Errorf("failed to communicate with tracking service: %v", err)
+ } else {
+ b, _ := ioutil.ReadAll(resp.Body)
+ log.Errorf("tracking service call failed. code: %d, body: %s", resp.StatusCode, string(b))
+ }
+ backOffFunc()
+ resp.Body.Close()
+ continue
+ }
+
+ resp.Body.Close()
+ return nil
+ }
+}
+
+// call whenever the list of deployments changes
+func incrementETag() {
+ atomic.AddInt64(&eTag, 1)
+}
+
+func getETag() string {
+ e := atomic.LoadInt64(&eTag)
+ return strconv.FormatInt(e, 10)
}
diff --git a/api_test.go b/api_test.go
index cb8ec38..1f35233 100644
--- a/api_test.go
+++ b/api_test.go
@@ -14,11 +14,6 @@
var _ = Describe("api", func() {
- BeforeEach(func() {
- _, err := getDB().Exec("DELETE FROM deployments")
- Expect(err).ShouldNot(HaveOccurred())
- })
-
Context("GET /deployments", func() {
It("should get an empty array if no deployments", func() {
@@ -32,6 +27,17 @@
Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
})
+ It("should debounce requests", func() {
+ var listener = make(chan string)
+ addSubscriber <- listener
+
+ deploymentsChanged <- "x"
+ deploymentsChanged <- "y"
+
+ id := <-listener
+ Expect(id).To(Equal("y"))
+ })
+
It("should get current deployments", func() {
deploymentID := "api_get_current"
@@ -102,7 +108,6 @@
defer res.Body.Close()
Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
})
It("should get new deployment after blocking", func(done Done) {
@@ -114,6 +119,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ eTag := res.Header.Get("etag")
deploymentID = "api_get_current_blocking2"
go func() {
@@ -124,7 +130,7 @@
uri.RawQuery = query.Encode()
req, err := http.NewRequest("GET", uri.String(), nil)
req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
+ req.Header.Add("If-None-Match", eTag)
res, err := http.DefaultClient.Do(req)
Expect(err).ShouldNot(HaveOccurred())
@@ -176,7 +182,7 @@
})
})
- Context("POST /deployments", func() {
+ Context("PUT /deployments", func() {
It("should return BadRequest for invalid request", func() {
@@ -190,7 +196,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -215,7 +221,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -242,7 +248,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -265,7 +271,7 @@
uri, err := url.Parse(testServer.URL)
uri.Path = deploymentsEndpoint
- deploymentResult := apiDeploymentResults{
+ deploymentResults := apiDeploymentResults{
apiDeploymentResult{
ID: deploymentID,
Status: RESPONSE_STATUS_FAIL,
@@ -273,10 +279,10 @@
Message: "Some error message",
},
}
- payload, err := json.Marshal(deploymentResult)
+ payload, err := json.Marshal(deploymentResults)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -294,6 +300,29 @@
Expect(deploy_error_code).Should(Equal(100))
Expect(deploy_error_message).Should(Equal("Some error message"))
})
+
+ It("should communicate status to tracking server", func() {
+ deploymentResults := apiDeploymentResults{
+ apiDeploymentResult{
+ ID: "deploymentID",
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: 100,
+ Message: "Some error message",
+ },
+ }
+
+ err := transmitDeploymentResultsToServer(deploymentResults)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID"))
+ Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID"))
+ Expect(testLastTrackerBody).ToNot(BeEmpty())
+
+ var uploaded apiDeploymentResults
+ json.Unmarshal(testLastTrackerBody, &uploaded)
+
+ Expect(uploaded).To(Equal(deploymentResults))
+ })
})
})
@@ -323,16 +352,18 @@
DataScopeID: deploymentID,
BundleConfigJSON: string(bundleJson),
ConfigJSON: string(bundleJson),
- Status: "",
Created: "",
CreatedBy: "",
Updated: "",
UpdatedBy: "",
BundleName: deploymentID,
- BundleURI: "",
- BundleChecksum: "",
- BundleChecksumType: "",
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
LocalBundleURI: "x",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
}
err = InsertDeployment(tx, dep)
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml
index 916c081..c26d47f 100644
--- a/apidGatewayDeploy-api.yaml
+++ b/apidGatewayDeploy-api.yaml
@@ -118,7 +118,7 @@
$ref: '#/definitions/DeploymentResponse'
'304':
description: Deployment not modified.
- post:
+ put:
description: Save results of deployment
parameters:
- name: _
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 3ce174c..061879d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -17,8 +17,10 @@
)
var (
- tmpDir string
- testServer *httptest.Server
+ tmpDir string
+ testServer *httptest.Server
+ testLastTrackerVars map[string]string
+ testLastTrackerBody []byte
)
var _ = BeforeSuite(func() {
@@ -31,6 +33,9 @@
Expect(err).NotTo(HaveOccurred())
config.Set("local_storage_path", tmpDir)
+ config.Set(configApidInstanceID, "INSTANCE_ID")
+ config.Set(configApidClusterID, "CLUSTER_ID")
+ config.Set(configApiServerBaseURI, "http://localhost")
apid.InitializePlugins()
@@ -40,20 +45,48 @@
Expect(err).NotTo(HaveOccurred())
SetDB(db)
+ debounceDuration = time.Millisecond
+ bundleCleanupDelay = time.Millisecond
+ bundleRetryDelay = 10 * time.Millisecond
+ bundleDownloadTimeout = 50 * time.Millisecond
+
router := apid.API().Router()
// fake an unreliable bundle repo
- backOffMultiplier = 10 * time.Millisecond
- count := 0
+ count := 1
router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
+ vars := apid.API().Vars(req)
if count % 2 == 0 {
w.WriteHeader(500)
return
}
- vars := apid.API().Vars(req)
+ if vars["id"] == "longfail" {
+ time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond))
+ }
w.Write([]byte("/bundles/" + vars["id"]))
- })
+
+ }).Methods("GET")
+
+ // fake an unreliable APID tracker
+ router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
+ func(w http.ResponseWriter, req *http.Request) {
+ count++
+ if count % 2 == 0 {
+ w.WriteHeader(500)
+ return
+ }
+
+ testLastTrackerVars = apid.API().Vars(req)
+ testLastTrackerBody, err = ioutil.ReadAll(req.Body)
+ Expect(err).ToNot(HaveOccurred())
+
+ w.Write([]byte("OK"))
+
+ }).Methods("PUT")
testServer = httptest.NewServer(router)
+
+ apiServerBaseURI, err = url.Parse(testServer.URL)
+ Expect(err).NotTo(HaveOccurred())
})
var _ = AfterSuite(func() {
@@ -64,6 +97,12 @@
os.RemoveAll(tmpDir)
})
+var _ = BeforeEach(func() {
+ _, err := getDB().Exec("DELETE FROM deployments")
+ Expect(err).ShouldNot(HaveOccurred())
+ _, err = getDB().Exec("UPDATE etag SET value=1")
+})
+
func TestApidGatewayDeploy(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ApidGatewayDeploy Suite")
diff --git a/bundle.go b/bundle.go
index 4762165..44f9a9c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,80 +1,115 @@
package apiGatewayDeploy
import (
+ "crypto/md5"
+ "encoding/base64"
+ "encoding/hex"
+ "errors"
"fmt"
+ "hash"
+ "hash/crc32"
"io"
+ "io/ioutil"
"net/http"
"net/url"
"os"
"path"
"time"
- "encoding/base64"
- "io/ioutil"
- "hash/crc32"
- "errors"
- "crypto/md5"
- "hash"
- "encoding/hex"
)
-const (
- DOWNLOAD_ATTEMPTS = 3
-)
+var bundleRetryDelay time.Duration = time.Second
+var bundleDownloadTimeout time.Duration = 10 * time.Minute
-var (
- backOffMultiplier = 10 * time.Second
-)
+// simple doubling back-off
+func createBackoff(retryIn, maxBackOff time.Duration) func() {
+ return func() {
+ log.Debugf("backoff called. will retry in %s.", retryIn)
+ time.Sleep(retryIn)
+ retryIn = retryIn * time.Duration(2)
+ if retryIn > maxBackOff {
+ retryIn = maxBackOff
+ }
+ }
+}
-func downloadBundle(dep DataDeployment) error {
+func downloadBundle(dep DataDeployment) {
- log.Debugf("starting bundle download process: %s", dep.BundleURI)
+ log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
hashWriter, err := getHashWriter(dep.BundleChecksumType)
if err != nil {
- log.Errorf("invalid checksum type: %v", err)
- return err
+ msg := fmt.Sprintf("invalid bundle checksum type: %v", dep.BundleChecksumType)
+ log.Error(msg)
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: msg,
+ },
+ })
+ return
}
- // retry
- var tempFile string
- for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+ retryIn := bundleRetryDelay
+ maxBackOff := 5 * time.Minute
+ backOffFunc := createBackoff(retryIn, maxBackOff)
+
+ // timeout and mark deployment failed
+ timeout := time.NewTimer(bundleDownloadTimeout)
+ go func() {
+ <-timeout.C
+ log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
+ var errMessage string
+ if err != nil {
+ errMessage = fmt.Sprintf("bundle download failed: %s", err)
+ } else {
+ errMessage = "bundle download failed"
+ }
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: errMessage,
+ },
+ })
+ }()
+
+ // todo: we'll want to abort download if deployment is deleted
+ for {
+ var tempFile, bundleFile string
tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+
+ if err == nil {
+ bundleFile = getBundleFile(dep)
+ err = os.Rename(tempFile, bundleFile)
+ if err != nil {
+ log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
+ }
+ }
+
+ if tempFile != "" {
+ go safeDelete(tempFile)
+ }
+
+ if err == nil {
+ err = updateLocalBundleURI(dep.ID, bundleFile)
+ }
+
+ // success!
if err == nil {
break
}
- if tempFile != "" {
- os.Remove(tempFile)
- }
- // simple back-off, we could potentially be more sophisticated
- retryIn := time.Duration(i) * backOffMultiplier
- log.Debugf("will retry failed download in %s: %v", retryIn, err)
- time.Sleep(retryIn)
+ backOffFunc()
hashWriter.Reset()
}
- if err != nil {
- log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
- return err
- }
-
- bundleFile := getBundleFile(dep)
- err = os.Rename(tempFile, bundleFile)
- if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
- os.Remove(tempFile)
- return err
- }
-
- err = updateLocalURI(dep.ID, bundleFile)
- if err != nil {
- return err
- }
+ log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
// send deployments to client
- deploymentsChanged<- dep.ID
-
- return nil
+ deploymentsChanged <- dep.ID
}
func getBundleFile(dep DataDeployment) string {
diff --git a/bundle_test.go b/bundle_test.go
new file mode 100644
index 0000000..1c959d9
--- /dev/null
+++ b/bundle_test.go
@@ -0,0 +1,100 @@
+package apiGatewayDeploy
+
+import (
+ "encoding/json"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "net/url"
+ "time"
+)
+
+var _ = Describe("bundle", func() {
+
+ Context("download", func() {
+
+ It("should timeout, mark status as failed, then finish", func() {
+
+ deploymentID := "bundle_download_fail"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/longfail"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundleJson, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ LocalBundleURI: "",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
+ }
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ go downloadBundle(dep)
+
+ // give download time to timeout
+ time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
+
+ // get error state deployment
+ deployments, err := getDeployments("WHERE id=$1", deploymentID)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+ Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+ Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+ Expect(d.LocalBundleURI).To(BeEmpty())
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+ <-listener
+
+ // get finished deployment
+ // still in error state (let client update), but with valid local bundle
+ deployments, err = getDeployments("WHERE id=$1", deploymentID)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d = deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+ Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+ Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+ Expect(d.LocalBundleURI).To(BeAnExistingFile())
+ })
+ })
+})
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
index d52314f..46e75d4 100644
--- a/cmd/apidGatewayDeploy/main.go
+++ b/cmd/apidGatewayDeploy/main.go
@@ -1,14 +1,14 @@
package main
import (
+ "encoding/json"
"flag"
"github.com/30x/apid"
"github.com/30x/apid/factory"
+ "github.com/30x/apidGatewayDeploy"
_ "github.com/30x/apidGatewayDeploy"
"io/ioutil"
- "github.com/30x/apidGatewayDeploy"
"os"
- "encoding/json"
)
func main() {
@@ -33,7 +33,6 @@
defer os.RemoveAll(tmpDir)
configService.Set("data_path", tmpDir)
- configService.Set("gatewaydeploy_bundle_dir", tmpDir) // todo: legacy?
if deploymentsFile != "" {
bytes, err := ioutil.ReadFile(deploymentsFile)
@@ -103,7 +102,6 @@
DataScopeID: ad.ScopeId,
BundleConfigJSON: string(ad.BundleConfigJson),
ConfigJSON: string(ad.ConfigJson),
- Status: "",
Created: "",
CreatedBy: "",
Updated: "",
@@ -115,7 +113,6 @@
LocalBundleURI: ad.URI,
}
-
err = apiGatewayDeploy.InsertDeployment(tx, dep)
if err != nil {
log.Error("Unable to insert deployment")
@@ -132,4 +129,4 @@
apiGatewayDeploy.InitAPI()
return nil
-}
\ No newline at end of file
+}
diff --git a/data.go b/data.go
index 45ad641..dc46ab7 100644
--- a/data.go
+++ b/data.go
@@ -19,16 +19,18 @@
DataScopeID string
BundleConfigJSON string
ConfigJSON string
- Status string
Created string
CreatedBy string
Updated string
UpdatedBy string
BundleName string
BundleURI string
+ LocalBundleURI string
BundleChecksum string
BundleChecksumType string
- LocalBundleURI string
+ DeployStatus string
+ DeployErrorCode int
+ DeployErrorMessage string
}
type SQLExec interface {
@@ -37,10 +39,6 @@
func InitDB(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS etag (
- value integer
- );
- INSERT INTO etag VALUES (1);
CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
@@ -48,7 +46,6 @@
data_scope_id varchar(36) NOT NULL,
bundle_config_json text NOT NULL,
config_json text NOT NULL,
- status text NOT NULL,
created timestamp without time zone,
created_by text,
updated timestamp without time zone,
@@ -56,6 +53,8 @@
bundle_name text,
bundle_uri text,
local_bundle_uri text,
+ bundle_checksum text,
+ bundle_checksum_type text,
deploy_status string,
deploy_error_code int,
deploy_error_message text,
@@ -77,49 +76,12 @@
return db
}
+// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
func SetDB(db apid.DB) {
- dbMux.Lock()
if unsafeDB == nil { // init API when DB is initialized
go InitAPI()
}
unsafeDB = db
- dbMux.Unlock()
-}
-
-// call whenever the list of deployments changes
-func incrementETag() error {
-
- stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;")
- if err != nil {
- log.Errorf("prepare update etag failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec()
- if err != nil {
- log.Errorf("update etag failed: %v", err)
- return err
- }
-
- log.Debugf("etag incremented")
- return err
-}
-
-func getETag() (string, error) {
-
- var eTag string
- db := getDB()
- row := db.QueryRow("SELECT value FROM etag")
- err := row.Scan(&eTag)
- //err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag)
- if err != nil {
- log.Errorf("select etag failed: %v", err)
- return "", err
- }
-
- log.Debugf("etag queried: %v", eTag)
- return eTag, err
}
func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
@@ -129,10 +91,11 @@
stmt, err := tx.Prepare(`
INSERT INTO deployments
(id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, status, created,
- created_by, updated, updated_by, bundle_name,
- bundle_uri, local_bundle_uri)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+ bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message)
+ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
@@ -142,9 +105,10 @@
_, err = stmt.Exec(
dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
- dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
- dep.BundleURI, dep.LocalBundleURI)
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+ dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+ dep.DeployErrorCode, dep.DeployErrorMessage)
if err != nil {
log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
return err
@@ -179,35 +143,57 @@
// getReadyDeployments() returns array of deployments that are ready to deploy
func getReadyDeployments() (deployments []DataDeployment, err error) {
+ return getDeployments("WHERE local_bundle_uri != $1", "")
+}
+// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
+func getUnreadyDeployments() (deployments []DataDeployment, err error) {
+ return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "")
+}
+
+// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
+func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
db := getDB()
- rows, err := db.Query(`
+
+ var stmt *sql.Stmt
+ stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, status, created,
- created_by, updated, updated_by, bundle_name,
- bundle_uri, local_bundle_uri
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_name, bundle_uri,
+ local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message
FROM deployments
- WHERE local_bundle_uri != ""
- `)
+ ` + where)
+ if err != nil {
+ return
+ }
+ var rows *sql.Rows
+ rows, err = stmt.Query(a...)
if err != nil {
if err == sql.ErrNoRows {
- return deployments, nil
+ return
}
log.Errorf("Error querying deployments: %v", err)
return
}
defer rows.Close()
+ deployments = dataDeploymentsFromRows(rows)
+
+ return
+}
+
+func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
- &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
- &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
- &dep.BundleURI, &dep.LocalBundleURI,
+ &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
+ &dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
+ &dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
+ &dep.DeployErrorCode, &dep.DeployErrorMessage,
)
deployments = append(deployments, dep)
}
-
return
}
@@ -252,18 +238,11 @@
return err
}
-func updateLocalURI(depID, localBundleUri string) error {
+func updateLocalBundleURI(depID, localBundleUri string) error {
- tx, err := getDB().Begin()
+ stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
- log.Errorf("begin updateLocalURI failed: %v", err)
- return err
- }
- defer tx.Rollback()
-
- stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
- if err != nil {
- log.Errorf("prepare updateLocalURI failed: %v", err)
+ log.Errorf("prepare updateLocalBundleURI failed: %v", err)
return err
}
defer stmt.Close()
@@ -274,13 +253,16 @@
return err
}
- err = tx.Commit()
- if err != nil {
- log.Errorf("commit updateLocalURI failed: %v", err)
- return err
- }
-
log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}
+
+func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+
+ err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+ if err == sql.ErrNoRows {
+ err = nil
+ }
+ return
+}
diff --git a/init.go b/init.go
index 12f1339..49d4912 100644
--- a/init.go
+++ b/init.go
@@ -1,20 +1,34 @@
package apiGatewayDeploy
import (
+ "fmt"
"github.com/30x/apid"
+ "net/url"
"os"
"path"
+ "time"
)
const (
- configBundleDirKey = "gatewaydeploy_bundle_dir"
+ configBundleDirKey = "gatewaydeploy_bundle_dir"
+ configDebounceDuration = "gatewaydeploy_debounce_duration"
+ configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
+ configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout"
+ configApiServerBaseURI = "apigeesync_proxy_server_base"
+ configApidInstanceID = "apigeesync_apid_instance_id"
+ configApidClusterID = "apigeesync_cluster_id"
)
var (
- services apid.Services
- log apid.LogService
- data apid.DataService
- bundlePath string
+ services apid.Services
+ log apid.LogService
+ data apid.DataService
+ bundlePath string
+ debounceDuration time.Duration
+ bundleCleanupDelay time.Duration
+ apiServerBaseURI *url.URL
+ apidInstanceID string
+ apidClusterID string
)
func init() {
@@ -27,7 +41,45 @@
log.Debug("start init")
config := services.Config()
+
+ if !config.IsSet(configApiServerBaseURI) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI)
+ }
+ var err error
+ apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI))
+ if err != nil {
+ return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
+ }
+
+ if !config.IsSet(configApidInstanceID) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
+ }
+ apidInstanceID = config.GetString(configApidInstanceID)
+
+ if !config.IsSet(configApidClusterID) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
+ }
+ apidClusterID = config.GetString(configApidClusterID)
+
config.SetDefault(configBundleDirKey, "bundles")
+ config.SetDefault(configDebounceDuration, time.Second)
+ config.SetDefault(configBundleCleanupDelay, time.Minute)
+ config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
+
+ debounceDuration = config.GetDuration(configDebounceDuration)
+ if debounceDuration < time.Millisecond {
+ return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
+ }
+
+ bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay)
+ if bundleCleanupDelay < time.Millisecond {
+ return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
+ }
+
+ bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout)
+ if bundleDownloadTimeout < time.Millisecond {
+ return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout)
+ }
data = services.Data()
@@ -35,7 +87,7 @@
storagePath := config.GetString("local_storage_path")
bundlePath = path.Join(storagePath, relativeBundlePath)
if err := os.MkdirAll(bundlePath, 0700); err != nil {
- log.Panicf("Failed bundle directory creation: %v", err)
+ return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err)
}
log.Infof("Bundle directory path is %s", bundlePath)
diff --git a/listener.go b/listener.go
index 3b57542..24f3d96 100644
--- a/listener.go
+++ b/listener.go
@@ -5,6 +5,8 @@
"encoding/json"
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
+ "os"
+ "time"
)
const (
@@ -60,17 +62,18 @@
log.Panicf("Error starting transaction: %v", err)
}
+ // ensure that no new database updates are made on old database
+ dbMux.Lock()
+ defer dbMux.Unlock()
+
defer tx.Rollback()
for _, table := range snapshot.Tables {
var err error
switch table.Name {
case DEPLOYMENT_TABLE:
log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
- if len(table.Rows) == 0 {
- return
- }
for _, row := range table.Rows {
- addDeployment(tx, row)
+ err = addDeployment(tx, row)
}
}
if err != nil {
@@ -84,6 +87,18 @@
}
SetDB(db)
+
+ // if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
+ if len(snapshot.Tables) == 0 {
+ deployments, err := getUnreadyDeployments()
+ if err != nil {
+ log.Panicf("unable to query database for unready deployments: %v", err)
+ }
+ for _, dep := range deployments {
+ go downloadBundle(dep)
+ }
+ }
+
log.Debug("Snapshot processed")
}
@@ -94,6 +109,12 @@
log.Panicf("Error processing ChangeList: %v", err)
}
defer tx.Rollback()
+
+ // ensure bundle download and delete updates aren't attempted while in process
+ dbMux.Lock()
+ defer dbMux.Unlock()
+
+ var bundlesToDelete []string
for _, change := range changes.Changes {
var err error
switch change.Table {
@@ -103,10 +124,11 @@
err = addDeployment(tx, change.NewRow)
case common.Delete:
var id string
- err = change.OldRow.Get("id", &id)
+ change.OldRow.Get("id", &id)
+ localBundleUri, err := getLocalBundleURI(tx, id)
if err == nil {
+ bundlesToDelete = append(bundlesToDelete, localBundleUri)
err = deleteDeployment(tx, id)
- // todo: delete downloaded bundle file
}
default:
log.Errorf("unexpected operation: %s", change.Operation)
@@ -120,58 +142,36 @@
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
+
+ // clean up old bundles
+ if len(bundlesToDelete) > 0 {
+ log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+ go func() {
+ // give clients a minute to avoid conflicts
+ time.Sleep(bundleCleanupDelay)
+ for _, b := range bundlesToDelete {
+ log.Debugf("removing old bundle: %v", b)
+ safeDelete(b)
+ }
+ }()
+ }
}
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) {
- d := DataDeployment{}
- err = row.Get("id", &d.ID)
- if err != nil {
- return
- }
- err = row.Get("bundle_config_id", &d.BundleConfigID)
- if err != nil {
- return
- }
- err = row.Get("apid_cluster_id", &d.ApidClusterID)
- if err != nil {
- return
- }
- err = row.Get("data_scope_id", &d.DataScopeID)
- if err != nil {
- return
- }
- err = row.Get("bundle_config_json", &d.BundleConfigJSON)
- if err != nil {
- return
- }
- err = row.Get("config_json", &d.ConfigJSON)
- if err != nil {
- return
- }
- err = row.Get("status", &d.Status)
- if err != nil {
- return
- }
- err = row.Get("created", &d.Created)
- if err != nil {
- return
- }
- err = row.Get("created_by", &d.CreatedBy)
- if err != nil {
- return
- }
- err = row.Get("updated", &d.Updated)
- if err != nil {
- return
- }
- err = row.Get("updated_by", &d.UpdatedBy)
- if err != nil {
- return
- }
+ row.Get("id", &d.ID)
+ row.Get("bundle_config_id", &d.BundleConfigID)
+ row.Get("apid_cluster_id", &d.ApidClusterID)
+ row.Get("data_scope_id", &d.DataScopeID)
+ row.Get("bundle_config_json", &d.BundleConfigJSON)
+ row.Get("config_json", &d.ConfigJSON)
+ row.Get("created", &d.Created)
+ row.Get("created_by", &d.CreatedBy)
+ row.Get("updated", &d.Updated)
+ row.Get("updated_by", &d.UpdatedBy)
var bc bundleConfigJson
- err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
+ json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
if err != nil {
log.Errorf("JSON decoding Manifest failed: %v", err)
return
@@ -182,6 +182,17 @@
d.BundleChecksumType = bc.ChecksumType
d.BundleChecksum = bc.Checksum
+ return
+}
+
+func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+
+ var d DataDeployment
+ d, err = dataDeploymentFromRow(row)
+ if err != nil {
+ return
+ }
+
err = InsertDeployment(tx, d)
if err != nil {
return
@@ -191,3 +202,9 @@
go downloadBundle(d)
return
}
+
+func safeDelete(file string) {
+ if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
+ log.Warnf("unable to delete file %s: %v", file, e)
+ }
+}
diff --git a/listener_test.go b/listener_test.go
index ec4b7f2..1b52f7f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -11,11 +11,6 @@
var _ = Describe("listener", func() {
- BeforeEach(func() {
- _, err := getDB().Exec("DELETE FROM deployments")
- Expect(err).ShouldNot(HaveOccurred())
- })
-
Context("ApigeeSync snapshot event", func() {
It("should set DB and process", func(done Done) {