Merge pull request #13 from 30x/XAPID-671
regressions tests for download bundle connection timeout and initial startup processing
diff --git a/README.md b/README.md
index 168daed..9691d3a 100644
--- a/README.md
+++ b/README.md
@@ -23,9 +23,13 @@
Duration between deleting a deployment and deleting it's bundles on disk.
Default: "1s"
-#### gatewaydeploy_bundle_download_timeout
-Duration before bundle download marks deployment as failed (will continue retries regardless).
-Default: "1m"
+#### gatewaydeploy_download_connection_timeout
+Duration before a bundle download connection terminates
+Default: "5m"
+
+#### gatewaydeploy_deployment_timeout
+Duration before bundle download marks deployment as failed (will continue download retries regardless).
+Default: "10m"
#### gatewaydeploy_bundle_dir
Relative location from local_storage_path in which to store local bundle files.
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index ad253e0..ccb41b2 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -50,7 +50,7 @@
debounceDuration = time.Millisecond
bundleCleanupDelay = time.Millisecond
bundleRetryDelay = 10 * time.Millisecond
- bundleDownloadTimeout = 50 * time.Millisecond
+ markDeploymentFailedAfter = 50 * time.Millisecond
concurrentDownloads = 1
downloadQueueSize = 1
@@ -76,7 +76,7 @@
return
}
if vars["id"] == "longfail" {
- time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond))
+ time.Sleep(markDeploymentFailedAfter + (250 * time.Millisecond))
}
w.Write([]byte("/bundles/" + vars["id"]))
diff --git a/bundle.go b/bundle.go
index 1163946..1105606 100644
--- a/bundle.go
+++ b/bundle.go
@@ -18,10 +18,11 @@
)
var (
- bundleRetryDelay = time.Second
- bundleDownloadTimeout = 10 * time.Minute
- downloadQueue = make(chan *DownloadRequest, downloadQueueSize)
- workerQueue = make(chan chan *DownloadRequest, concurrentDownloads)
+ markDeploymentFailedAfter time.Duration
+ bundleDownloadConnTimeout time.Duration
+ bundleRetryDelay = time.Second
+ downloadQueue = make(chan *DownloadRequest, downloadQueueSize)
+ workerQueue = make(chan chan *DownloadRequest, concurrentDownloads)
)
// simple doubling back-off
@@ -55,13 +56,13 @@
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
- timeoutAfter := time.Now().Add(bundleDownloadTimeout)
+ markFailedAt := time.Now().Add(markDeploymentFailedAfter)
req := &DownloadRequest{
dep: dep,
hashWriter: hashWriter,
bundleFile: getBundleFile(dep),
backoffFunc: createBackoff(retryIn, maxBackOff),
- timeoutAfter: timeoutAfter,
+ markFailedAt: markFailedAt,
}
downloadQueue <- req
}
@@ -71,7 +72,7 @@
hashWriter hash.Hash
bundleFile string
backoffFunc func()
- timeoutAfter time.Time
+ markFailedAt time.Time
}
func (r *DownloadRequest) downloadBundle() {
@@ -122,9 +123,9 @@
func (r *DownloadRequest) checkTimeout() {
- if !r.timeoutAfter.IsZero() {
- if time.Now().After(r.timeoutAfter) {
- r.timeoutAfter = time.Time{}
+ if !r.markFailedAt.IsZero() {
+ if time.Now().After(r.markFailedAt) {
+ r.markFailedAt = time.Time{}
log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
r.dep.ID, r.dep.BundleURI)
setDeploymentResults(apiDeploymentResults{
@@ -209,7 +210,10 @@
}
// GET the contents at uriString
- res, err := http.Get(uriString)
+ client := http.Client{
+ Timeout: bundleDownloadConnTimeout,
+ }
+ res, err := client.Get(uriString)
if err != nil {
return nil, err
}
diff --git a/bundle_test.go b/bundle_test.go
index 796be9a..84989ea 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -16,18 +16,73 @@
Context("download", func() {
- It("should timeout, mark status as failed, then finish", func() {
+ It("should timeout connection and retry", func() {
+ defer func() {
+ bundleDownloadConnTimeout = time.Second
+ }()
+ bundleDownloadConnTimeout = 100 * time.Millisecond
+ firstTime := true
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if firstTime {
+ firstTime = false
+ time.Sleep(1 * time.Second)
+ w.WriteHeader(500)
+ } else {
+ //proceed <- true
+ w.Write([]byte("/bundles/longfail"))
+ }
+ }))
+ defer ts.Close()
+
+ uri, err := url.Parse(ts.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+ uri.Path = "/bundles/longfail"
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ deploymentID := "bundle_download_fail"
+ dep := DataDeployment{
+ ID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleURI: uri.String(),
+ BundleChecksum: testGetChecksum("crc-32", uri.String()),
+ BundleChecksumType: "crc-32",
+ }
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ queueDownloadRequest(dep)
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+ <-listener
+
+ getReadyDeployments()
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+ Expect(d.ID).To(Equal(deploymentID))
+ })
+
+ It("should timeout deployment, mark status as failed, then finish", func() {
proceed := make(chan bool)
failedOnce := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if failedOnce {
proceed <- true
- time.Sleep(bundleDownloadTimeout)
+ time.Sleep(markDeploymentFailedAfter)
w.Write([]byte("/bundles/longfail"))
} else {
failedOnce = true
- time.Sleep(bundleDownloadTimeout)
+ time.Sleep(markDeploymentFailedAfter)
w.WriteHeader(500)
}
}))
@@ -234,7 +289,7 @@
queueDownloadRequest(dep)
// give download time to finish
- time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
+ time.Sleep(markDeploymentFailedAfter + (100 * time.Millisecond))
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())
diff --git a/init.go b/init.go
index f507357..ff39a3e 100644
--- a/init.go
+++ b/init.go
@@ -14,7 +14,8 @@
configBundleDirKey = "gatewaydeploy_bundle_dir"
configDebounceDuration = "gatewaydeploy_debounce_duration"
configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
- configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout"
+ configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout"
+ configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout"
configApiServerBaseURI = "apigeesync_proxy_server_base"
configApidInstanceID = "apigeesync_apid_instance_id"
configApidClusterID = "apigeesync_cluster_id"
@@ -69,7 +70,8 @@
config.SetDefault(configBundleDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
- config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
+ config.SetDefault(configMarkDeployFailedAfter, 10*time.Minute)
+ config.SetDefault(configDownloadConnTimeout, 5*time.Minute)
config.SetDefault(configConcurrentDownloads, 15)
config.SetDefault(configDownloadQueueSize, 2000)
@@ -83,9 +85,14 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
}
- bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout)
- if bundleDownloadTimeout < time.Millisecond {
- return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout)
+ markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter)
+ if markDeploymentFailedAfter < time.Millisecond {
+ return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter)
+ }
+
+ bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout)
+ if bundleDownloadConnTimeout < time.Millisecond {
+ return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
}
data = services.Data()
diff --git a/listener.go b/listener.go
index 0f5c493..938b9d2 100644
--- a/listener.go
+++ b/listener.go
@@ -119,6 +119,7 @@
if err != nil {
log.Panicf("unable to query database for unready deployments: %v", err)
}
+ log.Debugf("Queuing %d deployments for bundle download", len(deployments))
for _, dep := range deployments {
queueDownloadRequest(dep)
}
diff --git a/listener_test.go b/listener_test.go
index dbee310..40c9ce4 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -66,6 +66,73 @@
close(done)
})
+
+ It("should set DB and process unready on startup event", func(done Done) {
+
+ deploymentID := "startup_test"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ }
+
+ // init without info == startup on existing DB
+ var snapshot = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{},
+ }
+
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
+
+ err = InitDB(db)
+ if err != nil {
+ log.Panicf("Unable to initialize database: %v", err)
+ }
+
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
+
+ id := <-listener
+ Expect(id).To(Equal(deploymentID))
+
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ close(done)
+ })
})
Context("ApigeeSync change event", func() {