Add bundle retry forever code - with a timeout to mark deployment as failed, update readme
diff --git a/README.md b/README.md
index c5bf99a..168daed 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# apidVerifyAPIKey
+# apidGatewayDeploy
This core plugin for [apid](http://github.com/30x/apid) responds to
[apidApigeeSync](https://github.com/30x/apidApigeeSync) events and publishes an API that allows clients to
@@ -13,6 +13,26 @@
See [apidGatewayDeploy-api.yaml]() for full spec.
+## Configuration
+
+#### gatewaydeploy_debounce_duration
+Window of time during which deployment changes are gathered before sending to client.
+Default: "bundles"
+
+#### gatewaydeploy_bundle_cleanup_delay
+Duration between deleting a deployment and deleting it's bundles on disk.
+Default: "1s"
+
+#### gatewaydeploy_bundle_download_timeout
+Duration before bundle download marks deployment as failed (will continue retries regardless).
+Default: "1m"
+
+#### gatewaydeploy_bundle_dir
+Relative location from local_storage_path in which to store local bundle files.
+Default: "5m"
+
+(durations note, see: https://golang.org/pkg/time/#ParseDuration)
+
## Building and running standalone
First, install prerequisites:
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index dffb117..33d3cf3 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -42,18 +42,22 @@
debounceDuration = time.Millisecond
bundleCleanupDelay = time.Millisecond
+ bundleRetryDelay = 10 * time.Millisecond
+ bundleDownloadTimeout = 50 * time.Millisecond
router := apid.API().Router()
// fake an unreliable bundle repo
- backOffMultiplier = time.Millisecond
count := 1
router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
vars := apid.API().Vars(req)
- if count % 2 == 0 || vars["id"] == "alwaysfail" {
+ if count % 2 == 0 {
w.WriteHeader(500)
return
}
+ if vars["id"] == "longfail" {
+ time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond))
+ }
w.Write([]byte("/bundles/" + vars["id"]))
})
testServer = httptest.NewServer(router)
diff --git a/bundle.go b/bundle.go
index 3e018d0..cdba445 100644
--- a/bundle.go
+++ b/bundle.go
@@ -17,17 +17,12 @@
"encoding/hex"
)
-const (
- DOWNLOAD_ATTEMPTS = 3
-)
-
-var (
- backOffMultiplier = 10 * time.Second
-)
+var bundleRetryDelay time.Duration = time.Second
+var bundleDownloadTimeout time.Duration = 10 * time.Minute
func downloadBundle(dep DataDeployment) {
- log.Debugf("starting bundle download process: %s", dep.BundleURI)
+ log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
hashWriter, err := getHashWriter(dep.BundleChecksumType)
if err != nil {
@@ -44,9 +39,35 @@
return
}
- // todo: do forever with backoff - note: we'll want to abort if deployment is deleted, however
- // todo: also, we'll still mark deployment result as "failed" - after some timeout
- for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+ // simple doubling back-off
+ retryIn := bundleRetryDelay
+ maxBackOff := 5 * time.Minute
+ backOff := func() {
+ log.Debugf("will retry failed download in %s: %v", retryIn, err)
+ time.Sleep(retryIn)
+ retryIn = retryIn * time.Duration(2)
+ if retryIn > maxBackOff {
+ retryIn = maxBackOff
+ }
+ }
+
+ // timeout and mark deployment failed
+ timeout := time.NewTimer(bundleDownloadTimeout)
+ go func() {
+ <- timeout.C
+ log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: fmt.Sprintf("bundle download failed: %s", err),
+ },
+ })
+ }()
+
+ // todo: we'll want to abort download if deployment is deleted
+ for {
var tempFile, bundleFile string
tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
@@ -66,31 +87,16 @@
err = updateLocalBundleURI(dep.ID, bundleFile)
}
+ // success!
if err == nil {
break
}
- // simple back-off, we could potentially be more sophisticated
- retryIn := time.Duration(i) * backOffMultiplier
- log.Debugf("will retry failed download in %s: %v", retryIn, err)
- time.Sleep(retryIn)
+ backOff()
hashWriter.Reset()
}
- if err != nil {
- log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
- }
- if err != nil {
- setDeploymentResults(apiDeploymentResults{
- {
- ID: dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: ERROR_CODE_TODO,
- Message: fmt.Sprintf("bundle download failed: %s", err),
- },
- })
- return
- }
+ log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
// send deployments to client
deploymentsChanged<- dep.ID
diff --git a/bundle_test.go b/bundle_test.go
index 2d1645c..1c959d9 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -1,28 +1,29 @@
package apiGatewayDeploy
import (
+ "encoding/json"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"net/url"
- "encoding/json"
+ "time"
)
var _ = Describe("bundle", func() {
Context("download", func() {
- It("should mark the status as failed if download fails", func() {
+ It("should timeout, mark status as failed, then finish", func() {
deploymentID := "bundle_download_fail"
uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundles/alwaysfail"
+ uri.Path = "/bundles/longfail"
bundleUri := uri.String()
bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
+ Name: uri.Path,
+ URI: bundleUri,
ChecksumType: "crc-32",
}
bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
@@ -33,23 +34,23 @@
Expect(err).ShouldNot(HaveOccurred())
dep := DataDeployment{
- ID: deploymentID,
- BundleConfigID: deploymentID,
- ApidClusterID: deploymentID,
- DataScopeID: deploymentID,
- BundleConfigJSON: string(bundleJson),
- ConfigJSON: string(bundleJson),
- Created: "",
- CreatedBy: "",
- Updated: "",
- UpdatedBy: "",
- BundleName: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
BundleChecksumType: bundle.ChecksumType,
- LocalBundleURI: "",
- DeployStatus: "",
- DeployErrorCode: 0,
+ LocalBundleURI: "",
+ DeployStatus: "",
+ DeployErrorCode: 0,
DeployErrorMessage: "",
}
@@ -59,12 +60,12 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- var listener = make(chan string)
- addSubscriber <- listener
+ go downloadBundle(dep)
- downloadBundle(dep)
+ // give download time to timeout
+ time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
- // get deployment
+ // get error state deployment
deployments, err := getDeployments("WHERE id=$1", deploymentID)
Expect(err).ShouldNot(HaveOccurred())
@@ -72,10 +73,28 @@
d := deployments[0]
Expect(d.ID).To(Equal(deploymentID))
- Expect(d.LocalBundleURI).To(BeEmpty())
Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+ Expect(d.LocalBundleURI).To(BeEmpty())
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+ <-listener
+
+ // get finished deployment
+ // still in error state (let client update), but with valid local bundle
+ deployments, err = getDeployments("WHERE id=$1", deploymentID)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d = deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+ Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+ Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+ Expect(d.LocalBundleURI).To(BeAnExistingFile())
})
})
})
diff --git a/init.go b/init.go
index 70cdae2..4d32964 100644
--- a/init.go
+++ b/init.go
@@ -8,9 +8,10 @@
)
const (
- configBundleDirKey = "gatewaydeploy_bundle_dir"
- configDebounceDuration = "gatewaydeploy_debounce_duration"
- configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
+ configBundleDirKey = "gatewaydeploy_bundle_dir"
+ configDebounceDuration = "gatewaydeploy_debounce_duration"
+ configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
+ configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout"
)
var (
@@ -35,6 +36,7 @@
config.SetDefault(configBundleDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
+ config.SetDefault(configBundleDownloadTimeout, 5 * time.Minute)
debounceDuration = config.GetDuration(configDebounceDuration)
if debounceDuration < time.Millisecond {
@@ -46,6 +48,11 @@
log.Panicf("%s must be a positive duration", configBundleCleanupDelay)
}
+ bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout)
+ if bundleDownloadTimeout < time.Millisecond {
+ log.Panicf("%s must be a positive duration", configBundleDownloadTimeout)
+ }
+
data = services.Data()
relativeBundlePath := config.GetString(configBundleDirKey)