Limit number of concurrent bundle downloads to 15
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7ae2a6c..7411e6f 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -4,16 +4,17 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/30x/apid-core"
- "github.com/30x/apid-core/factory"
+ "encoding/hex"
"io/ioutil"
"net/http"
"net/http/httptest"
+ "net/url"
+ "os"
"testing"
"time"
- "net/url"
- "encoding/hex"
- "os"
+
+ "github.com/30x/apid-core"
+ "github.com/30x/apid-core/factory"
)
var (
@@ -53,10 +54,21 @@
router := apid.API().Router()
// fake an unreliable bundle repo
count := 1
+ failedOnce := false
+ router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) {
+ if failedOnce {
+ vars := apid.API().Vars(req)
+ w.Write([]byte("/bundles/" + vars["id"]))
+ } else {
+ failedOnce = true
+ w.WriteHeader(500)
+ }
+ }).Methods("GET")
+
router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
vars := apid.API().Vars(req)
- if count % 2 == 0 {
+ if count%2 == 0 {
w.WriteHeader(500)
return
}
@@ -71,7 +83,7 @@
router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
func(w http.ResponseWriter, req *http.Request) {
count++
- if count % 2 == 0 {
+ if count%2 == 0 {
w.WriteHeader(500)
return
}
@@ -82,7 +94,7 @@
w.Write([]byte("OK"))
- }).Methods("PUT")
+ }).Methods("PUT")
testServer = httptest.NewServer(router)
apiServerBaseURI, err = url.Parse(testServer.URL)
diff --git a/bundle.go b/bundle.go
index 83eaff9..651576c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -17,8 +17,11 @@
"time"
)
+var numConcurrentDownloads = 15
var bundleRetryDelay time.Duration = time.Second
var bundleDownloadTimeout time.Duration = 10 * time.Minute
+var downloadQueue = make(chan *DownloadRequest, 200)
+var workerQueue = make(chan chan *DownloadRequest, numConcurrentDownloads)
// simple doubling back-off
func createBackoff(retryIn, maxBackOff time.Duration) func() {
@@ -32,7 +35,7 @@
}
}
-func downloadBundle(dep DataDeployment) {
+func queueDownloadRequest(dep DataDeployment) {
hashWriter, err := getHashWriter(dep.BundleChecksumType)
if err != nil {
@@ -49,13 +52,17 @@
return
}
- log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
-
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
- backOffFunc := createBackoff(retryIn, maxBackOff)
+ req := &DownloadRequest{
+ dep: dep,
+ hashWriter: hashWriter,
+ bundleFile: getBundleFile(dep),
+ backoffFunc: createBackoff(retryIn, maxBackOff),
+ }
+ downloadQueue <- req
- // timeout and mark deployment failed
+ // timeout and mark deployment failed (but retries will continue)
timeout := time.NewTimer(bundleDownloadTimeout)
go func() {
<-timeout.C
@@ -76,34 +83,51 @@
})
}()
- // todo: we'll want to abort download if deployment is deleted
- for {
- var tempFile, bundleFile string
- tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+}
- if err == nil {
- bundleFile = getBundleFile(dep)
- err = os.Rename(tempFile, bundleFile)
- if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
- }
+type DownloadRequest struct {
+ dep DataDeployment
+ hashWriter hash.Hash
+ bundleFile string
+ backoffFunc func()
+}
+
+func (r *DownloadRequest) downloadBundle() {
+
+ dep := r.dep
+ log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI)
+
+ deployments, err := getDeployments("WHERE id=$1", dep.ID)
+ if err == nil && len(deployments) == 0 {
+ log.Debugf("never mind, deployment %s was deleted", dep.ID)
+ return
+ }
+
+ r.hashWriter.Reset()
+ tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum)
+
+ if err == nil {
+ err = os.Rename(tempFile, r.bundleFile)
+ if err != nil {
+ log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
}
+ }
- if tempFile != "" {
- go safeDelete(tempFile)
- }
+ if tempFile != "" {
+ go safeDelete(tempFile)
+ }
- if err == nil {
- err = updateLocalBundleURI(dep.ID, bundleFile)
- }
+ if err == nil {
+ err = updateLocalBundleURI(dep.ID, r.bundleFile)
+ }
- // success!
- if err == nil {
- break
- }
-
- backOffFunc()
- hashWriter.Reset()
+ if err != nil {
+ // add myself back into the queue after back off
+ go func() {
+ r.backoffFunc()
+ downloadQueue <- r
+ }()
+ return
}
log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
@@ -218,31 +242,62 @@
return []byte("")
}
-//func checksumFile(hashType, checksum string, fileName string) error {
-//
-// hashWriter, err := getHashWriter(hashType)
-// if err != nil {
-// return err
-// }
-//
-// file, err := os.Open(fileName)
-// if err != nil {
-// return err
-// }
-// defer file.Close()
-//
-// if _, err := io.Copy(hashWriter, file); err != nil {
-// return err
-// }
-//
-// hashBytes := hashWriter.Sum(nil)
-// //hashBytes := hashWriter.Sum(nil)[:hasher.Size()]
-// //hashBytes := hashWriter.Sum(nil)[:]
-//
-// //hex.EncodeToString(hashBytes)
-// if checksum != hex.EncodeToString(hashBytes) {
-// return errors.New(fmt.Sprintf("bad checksum for %s", fileName))
-// }
-//
-// return nil
-//}
+func initializeBundleDownloading() {
+
+ // create workers
+ for i := 0; i < numConcurrentDownloads; i++ {
+ worker := BundleDownloader{
+ id: i + 1,
+ workChan: make(chan *DownloadRequest),
+ quitChan: make(chan bool),
+ }
+ worker.Start()
+ }
+
+ // run dispatcher
+ go func() {
+ for {
+ select {
+ case req := <-downloadQueue:
+ log.Debugf("dispatching downloader for: %s", req.bundleFile)
+ go func() {
+ worker := <-workerQueue
+ log.Debugf("got a worker for: %s", req.bundleFile)
+ worker <- req
+ }()
+ }
+ }
+ }()
+}
+
+type BundleDownloader struct {
+ id int
+ workChan chan *DownloadRequest
+ quitChan chan bool
+}
+
+func (w *BundleDownloader) Start() {
+ go func() {
+ log.Debugf("started bundle downloader %d", w.id)
+ for {
+ // wait for work
+ workerQueue <- w.workChan
+
+ select {
+ case req := <-w.workChan:
+ log.Debugf("starting download %s", req.bundleFile)
+ req.downloadBundle()
+
+ case <-w.quitChan:
+ log.Debugf("bundle downloader %d stopped", w.id)
+ return
+ }
+ }
+ }()
+}
+
+func (w *BundleDownloader) Stop() {
+ go func() {
+ w.quitChan <- true
+ }()
+}
diff --git a/bundle_test.go b/bundle_test.go
index 4f2bad2..009eba6 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -2,10 +2,11 @@
import (
"encoding/json"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
"net/url"
"time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
)
var _ = Describe("bundle", func() {
@@ -60,7 +61,7 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
// give download time to timeout
time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
@@ -97,6 +98,73 @@
Expect(d.LocalBundleURI).To(BeAnExistingFile())
})
+ It("should not continue attempts if deployment has been deleted", func() {
+
+ deploymentID := "bundle_download_deployment_deleted"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/failonce"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundleJson, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ LocalBundleURI: "",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
+ }
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ queueDownloadRequest(dep)
+
+ // skip first try
+ time.Sleep(bundleRetryDelay)
+
+ // delete deployment
+ tx, err = getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ deleteDeployment(tx, dep.ID)
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // wait for final
+ time.Sleep(bundleRetryDelay)
+
+ // No way to test this programmatically currently
+ // search logs for "never mind, deployment bundle_download_deployment_deleted was deleted"
+ })
+
// todo: temporary - this tests that checksum is disabled until server implements (XAPID-544)
It("should TEMPORARILY download even if empty Checksum and ChecksumType", func() {
@@ -111,7 +179,7 @@
Name: uri.Path,
URI: bundleUri,
ChecksumType: "",
- Checksum: "",
+ Checksum: "",
}
bundleJson, err := json.Marshal(bundle)
Expect(err).ShouldNot(HaveOccurred())
@@ -146,7 +214,7 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
// give download time to finish
time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
diff --git a/init.go b/init.go
index afc1366..68d75e6 100644
--- a/init.go
+++ b/init.go
@@ -2,11 +2,12 @@
import (
"fmt"
- "github.com/30x/apid-core"
"net/url"
"os"
"path"
"time"
+
+ "github.com/30x/apid-core"
)
const (
@@ -91,6 +92,8 @@
}
log.Infof("Bundle directory path is %s", bundlePath)
+ initializeBundleDownloading()
+
go distributeEvents()
initListener(services)
diff --git a/listener.go b/listener.go
index f631594..c3b591f 100644
--- a/listener.go
+++ b/listener.go
@@ -3,10 +3,11 @@
import (
"database/sql"
"encoding/json"
- "github.com/30x/apid-core"
- "github.com/apigee-labs/transicator/common"
"os"
"time"
+
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
)
const (
@@ -96,7 +97,7 @@
log.Panicf("unable to query database for unready deployments: %v", err)
}
for _, dep := range deployments {
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
}
}()
}
@@ -200,8 +201,7 @@
return
}
- // todo: limit # concurrent downloads?
- go downloadBundle(d)
+ queueDownloadRequest(d)
return
}