Add bundle retry forever code - with a timeout to mark deployment as failed, update readme
diff --git a/README.md b/README.md index c5bf99a..168daed 100644 --- a/README.md +++ b/README.md
@@ -1,4 +1,4 @@ -# apidVerifyAPIKey +# apidGatewayDeploy This core plugin for [apid](http://github.com/30x/apid) responds to [apidApigeeSync](https://github.com/30x/apidApigeeSync) events and publishes an API that allows clients to @@ -13,6 +13,26 @@ See [apidGatewayDeploy-api.yaml]() for full spec. +## Configuration + +#### gatewaydeploy_debounce_duration +Window of time during which deployment changes are gathered before sending to client. +Default: "bundles" + +#### gatewaydeploy_bundle_cleanup_delay +Duration between deleting a deployment and deleting it's bundles on disk. +Default: "1s" + +#### gatewaydeploy_bundle_download_timeout +Duration before bundle download marks deployment as failed (will continue retries regardless). +Default: "1m" + +#### gatewaydeploy_bundle_dir +Relative location from local_storage_path in which to store local bundle files. +Default: "5m" + +(durations note, see: https://golang.org/pkg/time/#ParseDuration) + ## Building and running standalone First, install prerequisites:
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index dffb117..33d3cf3 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -42,18 +42,22 @@ debounceDuration = time.Millisecond bundleCleanupDelay = time.Millisecond + bundleRetryDelay = 10 * time.Millisecond + bundleDownloadTimeout = 50 * time.Millisecond router := apid.API().Router() // fake an unreliable bundle repo - backOffMultiplier = time.Millisecond count := 1 router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) { count++ vars := apid.API().Vars(req) - if count % 2 == 0 || vars["id"] == "alwaysfail" { + if count % 2 == 0 { w.WriteHeader(500) return } + if vars["id"] == "longfail" { + time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond)) + } w.Write([]byte("/bundles/" + vars["id"])) }) testServer = httptest.NewServer(router)
diff --git a/bundle.go b/bundle.go index 3e018d0..cdba445 100644 --- a/bundle.go +++ b/bundle.go
@@ -17,17 +17,12 @@ "encoding/hex" ) -const ( - DOWNLOAD_ATTEMPTS = 3 -) - -var ( - backOffMultiplier = 10 * time.Second -) +var bundleRetryDelay time.Duration = time.Second +var bundleDownloadTimeout time.Duration = 10 * time.Minute func downloadBundle(dep DataDeployment) { - log.Debugf("starting bundle download process: %s", dep.BundleURI) + log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI) hashWriter, err := getHashWriter(dep.BundleChecksumType) if err != nil { @@ -44,9 +39,35 @@ return } - // todo: do forever with backoff - note: we'll want to abort if deployment is deleted, however - // todo: also, we'll still mark deployment result as "failed" - after some timeout - for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ { + // simple doubling back-off + retryIn := bundleRetryDelay + maxBackOff := 5 * time.Minute + backOff := func() { + log.Debugf("will retry failed download in %s: %v", retryIn, err) + time.Sleep(retryIn) + retryIn = retryIn * time.Duration(2) + if retryIn > maxBackOff { + retryIn = maxBackOff + } + } + + // timeout and mark deployment failed + timeout := time.NewTimer(bundleDownloadTimeout) + go func() { + <- timeout.C + log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI) + setDeploymentResults(apiDeploymentResults{ + { + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: fmt.Sprintf("bundle download failed: %s", err), + }, + }) + }() + + // todo: we'll want to abort download if deployment is deleted + for { var tempFile, bundleFile string tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum) @@ -66,31 +87,16 @@ err = updateLocalBundleURI(dep.ID, bundleFile) } + // success! if err == nil { break } - // simple back-off, we could potentially be more sophisticated - retryIn := time.Duration(i) * backOffMultiplier - log.Debugf("will retry failed download in %s: %v", retryIn, err) - time.Sleep(retryIn) + backOff() hashWriter.Reset() } - if err != nil { - log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS) - } - if err != nil { - setDeploymentResults(apiDeploymentResults{ - { - ID: dep.ID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: ERROR_CODE_TODO, - Message: fmt.Sprintf("bundle download failed: %s", err), - }, - }) - return - } + log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI) // send deployments to client deploymentsChanged<- dep.ID
diff --git a/bundle_test.go b/bundle_test.go index 2d1645c..1c959d9 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -1,28 +1,29 @@ package apiGatewayDeploy import ( + "encoding/json" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "net/url" - "encoding/json" + "time" ) var _ = Describe("bundle", func() { Context("download", func() { - It("should mark the status as failed if download fails", func() { + It("should timeout, mark status as failed, then finish", func() { deploymentID := "bundle_download_fail" uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundles/alwaysfail" + uri.Path = "/bundles/longfail" bundleUri := uri.String() bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, + Name: uri.Path, + URI: bundleUri, ChecksumType: "crc-32", } bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) @@ -33,23 +34,23 @@ Expect(err).ShouldNot(HaveOccurred()) dep := DataDeployment{ - ID: deploymentID, - BundleConfigID: deploymentID, - ApidClusterID: deploymentID, - DataScopeID: deploymentID, - BundleConfigJSON: string(bundleJson), - ConfigJSON: string(bundleJson), - Created: "", - CreatedBy: "", - Updated: "", - UpdatedBy: "", - BundleName: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, + ID: deploymentID, + BundleConfigID: deploymentID, + ApidClusterID: deploymentID, + DataScopeID: deploymentID, + BundleConfigJSON: string(bundleJson), + ConfigJSON: string(bundleJson), + Created: "", + CreatedBy: "", + Updated: "", + UpdatedBy: "", + BundleName: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, BundleChecksumType: bundle.ChecksumType, - LocalBundleURI: "", - DeployStatus: "", - DeployErrorCode: 0, + LocalBundleURI: "", + DeployStatus: "", + DeployErrorCode: 0, DeployErrorMessage: "", } @@ -59,12 +60,12 @@ err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) - var listener = make(chan string) - addSubscriber <- listener + go downloadBundle(dep) - downloadBundle(dep) + // give download time to timeout + time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond)) - // get deployment + // get error state deployment deployments, err := getDeployments("WHERE id=$1", deploymentID) Expect(err).ShouldNot(HaveOccurred()) @@ -72,10 +73,28 @@ d := deployments[0] Expect(d.ID).To(Equal(deploymentID)) - Expect(d.LocalBundleURI).To(BeEmpty()) Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL)) Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO)) Expect(d.DeployErrorMessage).ToNot(BeEmpty()) + Expect(d.LocalBundleURI).To(BeEmpty()) + + var listener = make(chan string) + addSubscriber <- listener + <-listener + + // get finished deployment + // still in error state (let client update), but with valid local bundle + deployments, err = getDeployments("WHERE id=$1", deploymentID) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d = deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL)) + Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO)) + Expect(d.DeployErrorMessage).ToNot(BeEmpty()) + Expect(d.LocalBundleURI).To(BeAnExistingFile()) }) }) })
diff --git a/init.go b/init.go index 70cdae2..4d32964 100644 --- a/init.go +++ b/init.go
@@ -8,9 +8,10 @@ ) const ( - configBundleDirKey = "gatewaydeploy_bundle_dir" - configDebounceDuration = "gatewaydeploy_debounce_duration" - configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" + configBundleDirKey = "gatewaydeploy_bundle_dir" + configDebounceDuration = "gatewaydeploy_debounce_duration" + configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" + configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout" ) var ( @@ -35,6 +36,7 @@ config.SetDefault(configBundleDirKey, "bundles") config.SetDefault(configDebounceDuration, time.Second) config.SetDefault(configBundleCleanupDelay, time.Minute) + config.SetDefault(configBundleDownloadTimeout, 5 * time.Minute) debounceDuration = config.GetDuration(configDebounceDuration) if debounceDuration < time.Millisecond { @@ -46,6 +48,11 @@ log.Panicf("%s must be a positive duration", configBundleCleanupDelay) } + bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout) + if bundleDownloadTimeout < time.Millisecond { + log.Panicf("%s must be a positive duration", configBundleDownloadTimeout) + } + data = services.Data() relativeBundlePath := config.GetString(configBundleDirKey)