incorporate timeout into download process
diff --git a/bundle.go b/bundle.go index 6f0c208..95fb3f6 100644 --- a/bundle.go +++ b/bundle.go
@@ -18,10 +18,10 @@ ) var ( - bundleRetryDelay time.Duration = time.Second - bundleDownloadTimeout time.Duration = 10 * time.Minute - downloadQueue = make(chan *DownloadRequest, downloadQueueSize) - workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) + bundleRetryDelay = time.Second + bundleDownloadTimeout = 10 * time.Minute + downloadQueue = make(chan *DownloadRequest, downloadQueueSize) + workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) ) // simple doubling back-off @@ -55,42 +55,23 @@ retryIn := bundleRetryDelay maxBackOff := 5 * time.Minute + timeoutAfter := time.Now().Add(bundleDownloadTimeout) req := &DownloadRequest{ - dep: dep, - hashWriter: hashWriter, - bundleFile: getBundleFile(dep), - backoffFunc: createBackoff(retryIn, maxBackOff), + dep: dep, + hashWriter: hashWriter, + bundleFile: getBundleFile(dep), + backoffFunc: createBackoff(retryIn, maxBackOff), + timeoutAfter: timeoutAfter, } downloadQueue <- req - - // timeout and mark deployment failed (but retries will continue) - timeout := time.NewTimer(bundleDownloadTimeout) - go func() { - <-timeout.C - log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI) - var errMessage string - if err != nil { - errMessage = fmt.Sprintf("bundle download failed: %s", err) - } else { - errMessage = "bundle download failed" - } - setDeploymentResults(apiDeploymentResults{ - { - ID: dep.ID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: ERROR_CODE_TODO, - Message: errMessage, - }, - }) - }() - } type DownloadRequest struct { - dep DataDeployment - hashWriter hash.Hash - bundleFile string - backoffFunc func() + dep DataDeployment + hashWriter hash.Hash + bundleFile string + backoffFunc func() + timeoutAfter time.Time } func (r *DownloadRequest) downloadBundle() { @@ -104,6 +85,8 @@ return } + r.checkTimeout() + r.hashWriter.Reset() tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum) @@ -137,6 +120,25 @@ deploymentsChanged <- dep.ID } +func (r *DownloadRequest) checkTimeout() { + + if !r.timeoutAfter.IsZero() { + if time.Now().After(r.timeoutAfter) { + r.timeoutAfter = time.Time{} + log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", + r.dep.ID, r.dep.BundleURI) + setDeploymentResults(apiDeploymentResults{ + { + ID: r.dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: ERROR_CODE_TODO, + Message: "bundle download failed", + }, + }) + } + } +} + func getBundleFile(dep DataDeployment) string { // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
diff --git a/bundle_test.go b/bundle_test.go index 009eba6..cb76219 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -5,6 +5,9 @@ "net/url" "time" + "net/http" + "net/http/httptest" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) @@ -13,11 +16,26 @@ Context("download", func() { - It("should timeout, mark status as failed, then finish", func() { + FIt("should timeout, mark status as failed, then finish", func() { + + proceed := make(chan bool) + failedOnce := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if failedOnce { + proceed <- true + time.Sleep(bundleDownloadTimeout) + w.Write([]byte("/bundles/longfail")) + } else { + failedOnce = true + time.Sleep(bundleDownloadTimeout) + w.WriteHeader(500) + } + })) + defer ts.Close() deploymentID := "bundle_download_fail" - uri, err := url.Parse(testServer.URL) + uri, err := url.Parse(ts.URL) Expect(err).ShouldNot(HaveOccurred()) uri.Path = "/bundles/longfail" @@ -63,8 +81,7 @@ queueDownloadRequest(dep) - // give download time to timeout - time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond)) + <-proceed // get error state deployment deployments, err := getDeployments("WHERE id=$1", deploymentID)