regressions tests for download bundle connection timeout and initial startup processing
diff --git a/README.md b/README.md index 168daed..9691d3a 100644 --- a/README.md +++ b/README.md
@@ -23,9 +23,13 @@ Duration between deleting a deployment and deleting it's bundles on disk. Default: "1s" -#### gatewaydeploy_bundle_download_timeout -Duration before bundle download marks deployment as failed (will continue retries regardless). -Default: "1m" +#### gatewaydeploy_download_connection_timeout +Duration before a bundle download connection terminates +Default: "5m" + +#### gatewaydeploy_deployment_timeout +Duration before bundle download marks deployment as failed (will continue download retries regardless). +Default: "10m" #### gatewaydeploy_bundle_dir Relative location from local_storage_path in which to store local bundle files.
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index ad253e0..ccb41b2 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -50,7 +50,7 @@ debounceDuration = time.Millisecond bundleCleanupDelay = time.Millisecond bundleRetryDelay = 10 * time.Millisecond - bundleDownloadTimeout = 50 * time.Millisecond + markDeploymentFailedAfter = 50 * time.Millisecond concurrentDownloads = 1 downloadQueueSize = 1 @@ -76,7 +76,7 @@ return } if vars["id"] == "longfail" { - time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond)) + time.Sleep(markDeploymentFailedAfter + (250 * time.Millisecond)) } w.Write([]byte("/bundles/" + vars["id"]))
diff --git a/bundle.go b/bundle.go index 1163946..1105606 100644 --- a/bundle.go +++ b/bundle.go
@@ -18,10 +18,11 @@ ) var ( - bundleRetryDelay = time.Second - bundleDownloadTimeout = 10 * time.Minute - downloadQueue = make(chan *DownloadRequest, downloadQueueSize) - workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) + markDeploymentFailedAfter time.Duration + bundleDownloadConnTimeout time.Duration + bundleRetryDelay = time.Second + downloadQueue = make(chan *DownloadRequest, downloadQueueSize) + workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) ) // simple doubling back-off @@ -55,13 +56,13 @@ retryIn := bundleRetryDelay maxBackOff := 5 * time.Minute - timeoutAfter := time.Now().Add(bundleDownloadTimeout) + markFailedAt := time.Now().Add(markDeploymentFailedAfter) req := &DownloadRequest{ dep: dep, hashWriter: hashWriter, bundleFile: getBundleFile(dep), backoffFunc: createBackoff(retryIn, maxBackOff), - timeoutAfter: timeoutAfter, + markFailedAt: markFailedAt, } downloadQueue <- req } @@ -71,7 +72,7 @@ hashWriter hash.Hash bundleFile string backoffFunc func() - timeoutAfter time.Time + markFailedAt time.Time } func (r *DownloadRequest) downloadBundle() { @@ -122,9 +123,9 @@ func (r *DownloadRequest) checkTimeout() { - if !r.timeoutAfter.IsZero() { - if time.Now().After(r.timeoutAfter) { - r.timeoutAfter = time.Time{} + if !r.markFailedAt.IsZero() { + if time.Now().After(r.markFailedAt) { + r.markFailedAt = time.Time{} log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", r.dep.ID, r.dep.BundleURI) setDeploymentResults(apiDeploymentResults{ @@ -209,7 +210,10 @@ } // GET the contents at uriString - res, err := http.Get(uriString) + client := http.Client{ + Timeout: bundleDownloadConnTimeout, + } + res, err := client.Get(uriString) if err != nil { return nil, err }
diff --git a/bundle_test.go b/bundle_test.go index 796be9a..84989ea 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -16,18 +16,73 @@ Context("download", func() { - It("should timeout, mark status as failed, then finish", func() { + It("should timeout connection and retry", func() { + defer func() { + bundleDownloadConnTimeout = time.Second + }() + bundleDownloadConnTimeout = 100 * time.Millisecond + firstTime := true + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if firstTime { + firstTime = false + time.Sleep(1 * time.Second) + w.WriteHeader(500) + } else { + //proceed <- true + w.Write([]byte("/bundles/longfail")) + } + })) + defer ts.Close() + + uri, err := url.Parse(ts.URL) + Expect(err).ShouldNot(HaveOccurred()) + uri.Path = "/bundles/longfail" + + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + + deploymentID := "bundle_download_fail" + dep := DataDeployment{ + ID: deploymentID, + DataScopeID: deploymentID, + BundleURI: uri.String(), + BundleChecksum: testGetChecksum("crc-32", uri.String()), + BundleChecksumType: "crc-32", + } + + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + queueDownloadRequest(dep) + + var listener = make(chan string) + addSubscriber <- listener + <-listener + + getReadyDeployments() + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d := deployments[0] + Expect(d.ID).To(Equal(deploymentID)) + }) + + It("should timeout deployment, mark status as failed, then finish", func() { proceed := make(chan bool) failedOnce := false ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if failedOnce { proceed <- true - time.Sleep(bundleDownloadTimeout) + time.Sleep(markDeploymentFailedAfter) w.Write([]byte("/bundles/longfail")) } else { failedOnce = true - time.Sleep(bundleDownloadTimeout) + time.Sleep(markDeploymentFailedAfter) w.WriteHeader(500) } })) @@ -234,7 +289,7 @@ queueDownloadRequest(dep) // give download time to finish - time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond)) + time.Sleep(markDeploymentFailedAfter + (100 * time.Millisecond)) deployments, err := getReadyDeployments() Expect(err).ShouldNot(HaveOccurred())
diff --git a/init.go b/init.go index f507357..ff39a3e 100644 --- a/init.go +++ b/init.go
@@ -14,7 +14,8 @@ configBundleDirKey = "gatewaydeploy_bundle_dir" configDebounceDuration = "gatewaydeploy_debounce_duration" configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" - configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout" + configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout" + configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout" configApiServerBaseURI = "apigeesync_proxy_server_base" configApidInstanceID = "apigeesync_apid_instance_id" configApidClusterID = "apigeesync_cluster_id" @@ -69,7 +70,8 @@ config.SetDefault(configBundleDirKey, "bundles") config.SetDefault(configDebounceDuration, time.Second) config.SetDefault(configBundleCleanupDelay, time.Minute) - config.SetDefault(configBundleDownloadTimeout, 5*time.Minute) + config.SetDefault(configMarkDeployFailedAfter, 10*time.Minute) + config.SetDefault(configDownloadConnTimeout, 5*time.Minute) config.SetDefault(configConcurrentDownloads, 15) config.SetDefault(configDownloadQueueSize, 2000) @@ -83,9 +85,14 @@ return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay) } - bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout) - if bundleDownloadTimeout < time.Millisecond { - return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout) + markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter) + if markDeploymentFailedAfter < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter) + } + + bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout) + if bundleDownloadConnTimeout < time.Millisecond { + return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout) } data = services.Data()
diff --git a/listener.go b/listener.go index 0f5c493..938b9d2 100644 --- a/listener.go +++ b/listener.go
@@ -119,6 +119,7 @@ if err != nil { log.Panicf("unable to query database for unready deployments: %v", err) } + log.Debugf("Queuing %d deployments for bundle download", len(deployments)) for _, dep := range deployments { queueDownloadRequest(dep) }
diff --git a/listener_test.go b/listener_test.go index dbee310..40c9ce4 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -66,6 +66,73 @@ close(done) }) + + It("should set DB and process unready on startup event", func(done Done) { + + deploymentID := "startup_test" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + + dep := DataDeployment{ + ID: deploymentID, + DataScopeID: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, + } + + // init without info == startup on existing DB + var snapshot = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{}, + } + + db, err := data.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } + + err = InitDB(db) + if err != nil { + log.Panicf("Unable to initialize database: %v", err) + } + + tx, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + var listener = make(chan string) + addSubscriber <- listener + + apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) + + id := <-listener + Expect(id).To(Equal(deploymentID)) + + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d := deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + close(done) + }) }) Context("ApigeeSync change event", func() {