add more test cases
diff --git a/bundle.go b/bundle.go index 62478fa..b0dc434 100644 --- a/bundle.go +++ b/bundle.go
@@ -148,7 +148,11 @@ log.Debugf("starting bundle download attempt for blobId=%s", r.blobId) - r.checkTimeout() + if r.checkTimeout() { + return &timeoutError{ + markFailedAt: r.markFailedAt, + } + } downloadedFile, err := downloadFromURI(r.blobServerURL, r.blobId, r.connTimeout) @@ -173,13 +177,15 @@ return nil } -func (r *DownloadRequest) checkTimeout() { +func (r *DownloadRequest) checkTimeout() bool { if !r.markFailedAt.IsZero() && time.Now().After(r.markFailedAt) { r.markFailedAt = time.Time{} log.Debugf("bundle download timeout. blobId=", r.blobId) + // TODO notify gateway of this failure + return true } - + return false } func getBlobFilePath(blobId string) string { @@ -284,6 +290,10 @@ log.Debugf("starting download blobId=%s", req.blobId) err := req.downloadBundle() if err != nil { + // timeout + if _, ok := err.(*timeoutError); ok { + continue + } go func() { req.backoffFunc() w.bm.enqueueRequest(req) @@ -305,3 +315,11 @@ } } } + +type timeoutError struct { + markFailedAt time.Time +} + +func (e *timeoutError) Error() string { + return fmt.Sprintf("Timeout. markFailedAt=%v", e.markFailedAt) +}
diff --git a/bundle_test.go b/bundle_test.go index 2528d71..fa44b03 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -23,6 +23,7 @@ . "github.com/onsi/gomega" "io" "strings" + "sync/atomic" "time" ) @@ -48,6 +49,9 @@ blobServer = &dummyBlobServer{ serverEndpoint: blobStoreUri, signedEndpoint: dummySignedEndpoint, + signedTimeout: new(int32), + blobTimeout: new(int32), + resetTimeout: true, } blobServer.start() } @@ -67,7 +71,7 @@ apiMan: dummyApiMan, concurrentDownloads: concurrentDownloads, markDeploymentFailedAfter: 5 * time.Second, - bundleDownloadConnTimeout: 5 * time.Second, + bundleDownloadConnTimeout: time.Second, bundleRetryDelay: time.Second, bundleCleanupDelay: 5 * time.Second, downloadQueue: make(chan *DownloadRequest, downloadQueueSize), @@ -85,11 +89,46 @@ }) It("should download blob according to id", func() { + // download blob id := GenerateUUID() testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) received := <-dummyDbMan.fileResponse Expect(received).Should(Equal(id)) }) + + It("should timeout connection and retry", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.bundleDownloadConnTimeout = 500 * time.Millisecond + testBundleMan.bundleRetryDelay = 50 * time.Millisecond + + // download blobs + id := GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + + }, 4) + + It("should mark as failure according to markDeploymentFailedAfter", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.bundleDownloadConnTimeout = 100 * time.Millisecond + testBundleMan.bundleRetryDelay = 100 * time.Millisecond + testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond + + // download blobs + id := GenerateUUID() + req := testBundleMan.makeDownloadRequest(id) + Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) + testBundleMan.enqueueRequest(req) + + // should fail + time.Sleep(time.Second) + Expect(req.markFailedAt.IsZero()).Should(BeTrue()) + }, 4) }) type dummyApiManager struct { @@ -101,6 +140,9 @@ type dummyBlobServer struct { serverEndpoint string signedEndpoint string + signedTimeout *int32 + blobTimeout *int32 + resetTimeout bool } func (b *dummyBlobServer) start() { @@ -110,6 +152,13 @@ // send a dummy uri as response func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + if atomic.LoadInt32(b.signedTimeout) == int32(1) { + if b.resetTimeout { + atomic.StoreInt32(b.signedTimeout, 0) + } + time.Sleep(time.Second) + } vars := mux.Vars(r) blobId := vars["blobId"] @@ -122,6 +171,13 @@ // send blobId back as response func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + if atomic.LoadInt32(b.blobTimeout) == int32(1) { + if b.resetTimeout { + atomic.StoreInt32(b.blobTimeout, 0) + } + time.Sleep(time.Second) + } vars := mux.Vars(r) blobId := vars["blobId"] log.Debug("dummyBlobServer returnBlob id=" + blobId)