Limit number of concurrent bundle downloads to 15
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 7ae2a6c..7411e6f 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -4,16 +4,17 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/30x/apid-core" - "github.com/30x/apid-core/factory" + "encoding/hex" "io/ioutil" "net/http" "net/http/httptest" + "net/url" + "os" "testing" "time" - "net/url" - "encoding/hex" - "os" + + "github.com/30x/apid-core" + "github.com/30x/apid-core/factory" ) var ( @@ -53,10 +54,21 @@ router := apid.API().Router() // fake an unreliable bundle repo count := 1 + failedOnce := false + router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) { + if failedOnce { + vars := apid.API().Vars(req) + w.Write([]byte("/bundles/" + vars["id"])) + } else { + failedOnce = true + w.WriteHeader(500) + } + }).Methods("GET") + router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) { count++ vars := apid.API().Vars(req) - if count % 2 == 0 { + if count%2 == 0 { w.WriteHeader(500) return } @@ -71,7 +83,7 @@ router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments", func(w http.ResponseWriter, req *http.Request) { count++ - if count % 2 == 0 { + if count%2 == 0 { w.WriteHeader(500) return } @@ -82,7 +94,7 @@ w.Write([]byte("OK")) - }).Methods("PUT") + }).Methods("PUT") testServer = httptest.NewServer(router) apiServerBaseURI, err = url.Parse(testServer.URL)
diff --git a/bundle.go b/bundle.go index 83eaff9..651576c 100644 --- a/bundle.go +++ b/bundle.go
@@ -17,8 +17,11 @@ "time" ) +var numConcurrentDownloads = 15 var bundleRetryDelay time.Duration = time.Second var bundleDownloadTimeout time.Duration = 10 * time.Minute +var downloadQueue = make(chan *DownloadRequest, 200) +var workerQueue = make(chan chan *DownloadRequest, numConcurrentDownloads) // simple doubling back-off func createBackoff(retryIn, maxBackOff time.Duration) func() { @@ -32,7 +35,7 @@ } } -func downloadBundle(dep DataDeployment) { +func queueDownloadRequest(dep DataDeployment) { hashWriter, err := getHashWriter(dep.BundleChecksumType) if err != nil { @@ -49,13 +52,17 @@ return } - log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI) - retryIn := bundleRetryDelay maxBackOff := 5 * time.Minute - backOffFunc := createBackoff(retryIn, maxBackOff) + req := &DownloadRequest{ + dep: dep, + hashWriter: hashWriter, + bundleFile: getBundleFile(dep), + backoffFunc: createBackoff(retryIn, maxBackOff), + } + downloadQueue <- req - // timeout and mark deployment failed + // timeout and mark deployment failed (but retries will continue) timeout := time.NewTimer(bundleDownloadTimeout) go func() { <-timeout.C @@ -76,34 +83,51 @@ }) }() - // todo: we'll want to abort download if deployment is deleted - for { - var tempFile, bundleFile string - tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum) +} - if err == nil { - bundleFile = getBundleFile(dep) - err = os.Rename(tempFile, bundleFile) - if err != nil { - log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) - } +type DownloadRequest struct { + dep DataDeployment + hashWriter hash.Hash + bundleFile string + backoffFunc func() +} + +func (r *DownloadRequest) downloadBundle() { + + dep := r.dep + log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI) + + deployments, err := getDeployments("WHERE id=$1", dep.ID) + if err == nil && len(deployments) == 0 { + log.Debugf("never mind, deployment %s was deleted", dep.ID) + return + } + + r.hashWriter.Reset() + tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum) + + if err == nil { + err = os.Rename(tempFile, r.bundleFile) + if err != nil { + log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err) } + } - if tempFile != "" { - go safeDelete(tempFile) - } + if tempFile != "" { + go safeDelete(tempFile) + } - if err == nil { - err = updateLocalBundleURI(dep.ID, bundleFile) - } + if err == nil { + err = updateLocalBundleURI(dep.ID, r.bundleFile) + } - // success! - if err == nil { - break - } - - backOffFunc() - hashWriter.Reset() + if err != nil { + // add myself back into the queue after back off + go func() { + r.backoffFunc() + downloadQueue <- r + }() + return } log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI) @@ -218,31 +242,62 @@ return []byte("") } -//func checksumFile(hashType, checksum string, fileName string) error { -// -// hashWriter, err := getHashWriter(hashType) -// if err != nil { -// return err -// } -// -// file, err := os.Open(fileName) -// if err != nil { -// return err -// } -// defer file.Close() -// -// if _, err := io.Copy(hashWriter, file); err != nil { -// return err -// } -// -// hashBytes := hashWriter.Sum(nil) -// //hashBytes := hashWriter.Sum(nil)[:hasher.Size()] -// //hashBytes := hashWriter.Sum(nil)[:] -// -// //hex.EncodeToString(hashBytes) -// if checksum != hex.EncodeToString(hashBytes) { -// return errors.New(fmt.Sprintf("bad checksum for %s", fileName)) -// } -// -// return nil -//} +func initializeBundleDownloading() { + + // create workers + for i := 0; i < numConcurrentDownloads; i++ { + worker := BundleDownloader{ + id: i + 1, + workChan: make(chan *DownloadRequest), + quitChan: make(chan bool), + } + worker.Start() + } + + // run dispatcher + go func() { + for { + select { + case req := <-downloadQueue: + log.Debugf("dispatching downloader for: %s", req.bundleFile) + go func() { + worker := <-workerQueue + log.Debugf("got a worker for: %s", req.bundleFile) + worker <- req + }() + } + } + }() +} + +type BundleDownloader struct { + id int + workChan chan *DownloadRequest + quitChan chan bool +} + +func (w *BundleDownloader) Start() { + go func() { + log.Debugf("started bundle downloader %d", w.id) + for { + // wait for work + workerQueue <- w.workChan + + select { + case req := <-w.workChan: + log.Debugf("starting download %s", req.bundleFile) + req.downloadBundle() + + case <-w.quitChan: + log.Debugf("bundle downloader %d stopped", w.id) + return + } + } + }() +} + +func (w *BundleDownloader) Stop() { + go func() { + w.quitChan <- true + }() +}
diff --git a/bundle_test.go b/bundle_test.go index 4f2bad2..009eba6 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -2,10 +2,11 @@ import ( "encoding/json" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" "net/url" "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" ) var _ = Describe("bundle", func() { @@ -60,7 +61,7 @@ err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) - go downloadBundle(dep) + queueDownloadRequest(dep) // give download time to timeout time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond)) @@ -97,6 +98,73 @@ Expect(d.LocalBundleURI).To(BeAnExistingFile()) }) + It("should not continue attempts if deployment has been deleted", func() { + + deploymentID := "bundle_download_deployment_deleted" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/failonce" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundleJson, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + + dep := DataDeployment{ + ID: deploymentID, + BundleConfigID: deploymentID, + ApidClusterID: deploymentID, + DataScopeID: deploymentID, + BundleConfigJSON: string(bundleJson), + ConfigJSON: string(bundleJson), + Created: "", + CreatedBy: "", + Updated: "", + UpdatedBy: "", + BundleName: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, + LocalBundleURI: "", + DeployStatus: "", + DeployErrorCode: 0, + DeployErrorMessage: "", + } + + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + queueDownloadRequest(dep) + + // skip first try + time.Sleep(bundleRetryDelay) + + // delete deployment + tx, err = getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + deleteDeployment(tx, dep.ID) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + // wait for final + time.Sleep(bundleRetryDelay) + + // No way to test this programmatically currently + // search logs for "never mind, deployment bundle_download_deployment_deleted was deleted" + }) + // todo: temporary - this tests that checksum is disabled until server implements (XAPID-544) It("should TEMPORARILY download even if empty Checksum and ChecksumType", func() { @@ -111,7 +179,7 @@ Name: uri.Path, URI: bundleUri, ChecksumType: "", - Checksum: "", + Checksum: "", } bundleJson, err := json.Marshal(bundle) Expect(err).ShouldNot(HaveOccurred()) @@ -146,7 +214,7 @@ err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) - go downloadBundle(dep) + queueDownloadRequest(dep) // give download time to finish time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
diff --git a/init.go b/init.go index afc1366..68d75e6 100644 --- a/init.go +++ b/init.go
@@ -2,11 +2,12 @@ import ( "fmt" - "github.com/30x/apid-core" "net/url" "os" "path" "time" + + "github.com/30x/apid-core" ) const ( @@ -91,6 +92,8 @@ } log.Infof("Bundle directory path is %s", bundlePath) + initializeBundleDownloading() + go distributeEvents() initListener(services)
diff --git a/listener.go b/listener.go index f631594..c3b591f 100644 --- a/listener.go +++ b/listener.go
@@ -3,10 +3,11 @@ import ( "database/sql" "encoding/json" - "github.com/30x/apid-core" - "github.com/apigee-labs/transicator/common" "os" "time" + + "github.com/30x/apid-core" + "github.com/apigee-labs/transicator/common" ) const ( @@ -96,7 +97,7 @@ log.Panicf("unable to query database for unready deployments: %v", err) } for _, dep := range deployments { - go downloadBundle(dep) + queueDownloadRequest(dep) } }() } @@ -200,8 +201,7 @@ return } - // todo: limit # concurrent downloads? - go downloadBundle(d) + queueDownloadRequest(d) return }