Merge pull request #10 from 30x/XAPID-706
Xapid 706
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7ae2a6c..4604588 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -4,16 +4,17 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "github.com/30x/apid-core"
- "github.com/30x/apid-core/factory"
+ "encoding/hex"
"io/ioutil"
"net/http"
"net/http/httptest"
+ "net/url"
+ "os"
"testing"
"time"
- "net/url"
- "encoding/hex"
- "os"
+
+ "github.com/30x/apid-core"
+ "github.com/30x/apid-core/factory"
)
var (
@@ -49,14 +50,27 @@
bundleCleanupDelay = time.Millisecond
bundleRetryDelay = 10 * time.Millisecond
bundleDownloadTimeout = 50 * time.Millisecond
+ concurrentDownloads = 1
+ downloadQueueSize = 1
router := apid.API().Router()
// fake an unreliable bundle repo
count := 1
+ failedOnce := false
+ router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) {
+ if failedOnce {
+ vars := apid.API().Vars(req)
+ w.Write([]byte("/bundles/" + vars["id"]))
+ } else {
+ failedOnce = true
+ w.WriteHeader(500)
+ }
+ }).Methods("GET")
+
router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
vars := apid.API().Vars(req)
- if count % 2 == 0 {
+ if count%2 == 0 {
w.WriteHeader(500)
return
}
@@ -71,7 +85,7 @@
router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
func(w http.ResponseWriter, req *http.Request) {
count++
- if count % 2 == 0 {
+ if count%2 == 0 {
w.WriteHeader(500)
return
}
@@ -82,7 +96,7 @@
w.Write([]byte("OK"))
- }).Methods("PUT")
+ }).Methods("PUT")
testServer = httptest.NewServer(router)
apiServerBaseURI, err = url.Parse(testServer.URL)
diff --git a/bundle.go b/bundle.go
index 83eaff9..95fb3f6 100644
--- a/bundle.go
+++ b/bundle.go
@@ -17,8 +17,12 @@
"time"
)
-var bundleRetryDelay time.Duration = time.Second
-var bundleDownloadTimeout time.Duration = 10 * time.Minute
+var (
+ bundleRetryDelay = time.Second
+ bundleDownloadTimeout = 10 * time.Minute
+ downloadQueue = make(chan *DownloadRequest, downloadQueueSize)
+ workerQueue = make(chan chan *DownloadRequest, concurrentDownloads)
+)
// simple doubling back-off
func createBackoff(retryIn, maxBackOff time.Duration) func() {
@@ -32,7 +36,7 @@
}
}
-func downloadBundle(dep DataDeployment) {
+func queueDownloadRequest(dep DataDeployment) {
hashWriter, err := getHashWriter(dep.BundleChecksumType)
if err != nil {
@@ -49,61 +53,65 @@
return
}
- log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
-
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
- backOffFunc := createBackoff(retryIn, maxBackOff)
+ timeoutAfter := time.Now().Add(bundleDownloadTimeout)
+ req := &DownloadRequest{
+ dep: dep,
+ hashWriter: hashWriter,
+ bundleFile: getBundleFile(dep),
+ backoffFunc: createBackoff(retryIn, maxBackOff),
+ timeoutAfter: timeoutAfter,
+ }
+ downloadQueue <- req
+}
- // timeout and mark deployment failed
- timeout := time.NewTimer(bundleDownloadTimeout)
- go func() {
- <-timeout.C
- log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
- var errMessage string
+type DownloadRequest struct {
+ dep DataDeployment
+ hashWriter hash.Hash
+ bundleFile string
+ backoffFunc func()
+ timeoutAfter time.Time
+}
+
+func (r *DownloadRequest) downloadBundle() {
+
+ dep := r.dep
+ log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI)
+
+ deployments, err := getDeployments("WHERE id=$1", dep.ID)
+ if err == nil && len(deployments) == 0 {
+ log.Debugf("never mind, deployment %s was deleted", dep.ID)
+ return
+ }
+
+ r.checkTimeout()
+
+ r.hashWriter.Reset()
+ tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum)
+
+ if err == nil {
+ err = os.Rename(tempFile, r.bundleFile)
if err != nil {
- errMessage = fmt.Sprintf("bundle download failed: %s", err)
- } else {
- errMessage = "bundle download failed"
+ log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
}
- setDeploymentResults(apiDeploymentResults{
- {
- ID: dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: ERROR_CODE_TODO,
- Message: errMessage,
- },
- })
- }()
+ }
- // todo: we'll want to abort download if deployment is deleted
- for {
- var tempFile, bundleFile string
- tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+ if tempFile != "" {
+ go safeDelete(tempFile)
+ }
- if err == nil {
- bundleFile = getBundleFile(dep)
- err = os.Rename(tempFile, bundleFile)
- if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
- }
- }
+ if err == nil {
+ err = updateLocalBundleURI(dep.ID, r.bundleFile)
+ }
- if tempFile != "" {
- go safeDelete(tempFile)
- }
-
- if err == nil {
- err = updateLocalBundleURI(dep.ID, bundleFile)
- }
-
- // success!
- if err == nil {
- break
- }
-
- backOffFunc()
- hashWriter.Reset()
+ if err != nil {
+ // add myself back into the queue after back off
+ go func() {
+ r.backoffFunc()
+ downloadQueue <- r
+ }()
+ return
}
log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
@@ -112,6 +120,25 @@
deploymentsChanged <- dep.ID
}
+func (r *DownloadRequest) checkTimeout() {
+
+ if !r.timeoutAfter.IsZero() {
+ if time.Now().After(r.timeoutAfter) {
+ r.timeoutAfter = time.Time{}
+ log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
+ r.dep.ID, r.dep.BundleURI)
+ setDeploymentResults(apiDeploymentResults{
+ {
+ ID: r.dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: ERROR_CODE_TODO,
+ Message: "bundle download failed",
+ },
+ })
+ }
+ }
+}
+
func getBundleFile(dep DataDeployment) string {
// the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
@@ -218,31 +245,62 @@
return []byte("")
}
-//func checksumFile(hashType, checksum string, fileName string) error {
-//
-// hashWriter, err := getHashWriter(hashType)
-// if err != nil {
-// return err
-// }
-//
-// file, err := os.Open(fileName)
-// if err != nil {
-// return err
-// }
-// defer file.Close()
-//
-// if _, err := io.Copy(hashWriter, file); err != nil {
-// return err
-// }
-//
-// hashBytes := hashWriter.Sum(nil)
-// //hashBytes := hashWriter.Sum(nil)[:hasher.Size()]
-// //hashBytes := hashWriter.Sum(nil)[:]
-//
-// //hex.EncodeToString(hashBytes)
-// if checksum != hex.EncodeToString(hashBytes) {
-// return errors.New(fmt.Sprintf("bad checksum for %s", fileName))
-// }
-//
-// return nil
-//}
+func initializeBundleDownloading() {
+
+ // create workers
+ for i := 0; i < concurrentDownloads; i++ {
+ worker := BundleDownloader{
+ id: i + 1,
+ workChan: make(chan *DownloadRequest),
+ quitChan: make(chan bool),
+ }
+ worker.Start()
+ }
+
+ // run dispatcher
+ go func() {
+ for {
+ select {
+ case req := <-downloadQueue:
+ log.Debugf("dispatching downloader for: %s", req.bundleFile)
+ go func() {
+ worker := <-workerQueue
+ log.Debugf("got a worker for: %s", req.bundleFile)
+ worker <- req
+ }()
+ }
+ }
+ }()
+}
+
+type BundleDownloader struct {
+ id int
+ workChan chan *DownloadRequest
+ quitChan chan bool
+}
+
+func (w *BundleDownloader) Start() {
+ go func() {
+ log.Debugf("started bundle downloader %d", w.id)
+ for {
+ // wait for work
+ workerQueue <- w.workChan
+
+ select {
+ case req := <-w.workChan:
+ log.Debugf("starting download %s", req.bundleFile)
+ req.downloadBundle()
+
+ case <-w.quitChan:
+ log.Debugf("bundle downloader %d stopped", w.id)
+ return
+ }
+ }
+ }()
+}
+
+func (w *BundleDownloader) Stop() {
+ go func() {
+ w.quitChan <- true
+ }()
+}
diff --git a/bundle_test.go b/bundle_test.go
index 4f2bad2..796be9a 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -2,10 +2,14 @@
import (
"encoding/json"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
"net/url"
"time"
+
+ "net/http"
+ "net/http/httptest"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
)
var _ = Describe("bundle", func() {
@@ -14,9 +18,24 @@
It("should timeout, mark status as failed, then finish", func() {
+ proceed := make(chan bool)
+ failedOnce := false
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if failedOnce {
+ proceed <- true
+ time.Sleep(bundleDownloadTimeout)
+ w.Write([]byte("/bundles/longfail"))
+ } else {
+ failedOnce = true
+ time.Sleep(bundleDownloadTimeout)
+ w.WriteHeader(500)
+ }
+ }))
+ defer ts.Close()
+
deploymentID := "bundle_download_fail"
- uri, err := url.Parse(testServer.URL)
+ uri, err := url.Parse(ts.URL)
Expect(err).ShouldNot(HaveOccurred())
uri.Path = "/bundles/longfail"
@@ -60,10 +79,9 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
- // give download time to timeout
- time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
+ <-proceed
// get error state deployment
deployments, err := getDeployments("WHERE id=$1", deploymentID)
@@ -97,6 +115,73 @@
Expect(d.LocalBundleURI).To(BeAnExistingFile())
})
+ It("should not continue attempts if deployment has been deleted", func() {
+
+ deploymentID := "bundle_download_deployment_deleted"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/failonce"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundleJson, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ LocalBundleURI: "",
+ DeployStatus: "",
+ DeployErrorCode: 0,
+ DeployErrorMessage: "",
+ }
+
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ queueDownloadRequest(dep)
+
+ // skip first try
+ time.Sleep(bundleRetryDelay)
+
+ // delete deployment
+ tx, err = getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ deleteDeployment(tx, dep.ID)
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ // wait for final
+ time.Sleep(bundleRetryDelay)
+
+ // No way to test this programmatically currently
+ // search logs for "never mind, deployment bundle_download_deployment_deleted was deleted"
+ })
+
// todo: temporary - this tests that checksum is disabled until server implements (XAPID-544)
It("should TEMPORARILY download even if empty Checksum and ChecksumType", func() {
@@ -111,7 +196,7 @@
Name: uri.Path,
URI: bundleUri,
ChecksumType: "",
- Checksum: "",
+ Checksum: "",
}
bundleJson, err := json.Marshal(bundle)
Expect(err).ShouldNot(HaveOccurred())
@@ -146,7 +231,7 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
// give download time to finish
time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
diff --git a/init.go b/init.go
index afc1366..f507357 100644
--- a/init.go
+++ b/init.go
@@ -2,11 +2,12 @@
import (
"fmt"
- "github.com/30x/apid-core"
"net/url"
"os"
"path"
"time"
+
+ "github.com/30x/apid-core"
)
const (
@@ -17,18 +18,22 @@
configApiServerBaseURI = "apigeesync_proxy_server_base"
configApidInstanceID = "apigeesync_apid_instance_id"
configApidClusterID = "apigeesync_cluster_id"
+ configConcurrentDownloads = "apigeesync_concurrent_downloads"
+ configDownloadQueueSize = "apigeesync_download_queue_size"
)
var (
- services apid.Services
- log apid.LogService
- data apid.DataService
- bundlePath string
- debounceDuration time.Duration
- bundleCleanupDelay time.Duration
- apiServerBaseURI *url.URL
- apidInstanceID string
- apidClusterID string
+ services apid.Services
+ log apid.LogService
+ data apid.DataService
+ bundlePath string
+ debounceDuration time.Duration
+ bundleCleanupDelay time.Duration
+ apiServerBaseURI *url.URL
+ apidInstanceID string
+ apidClusterID string
+ downloadQueueSize int
+ concurrentDownloads int
)
func init() {
@@ -65,6 +70,8 @@
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
+ config.SetDefault(configConcurrentDownloads, 15)
+ config.SetDefault(configDownloadQueueSize, 2000)
debounceDuration = config.GetDuration(configDebounceDuration)
if debounceDuration < time.Millisecond {
@@ -83,6 +90,8 @@
data = services.Data()
+ concurrentDownloads = config.GetInt(configConcurrentDownloads)
+ downloadQueueSize = config.GetInt(configDownloadQueueSize)
relativeBundlePath := config.GetString(configBundleDirKey)
storagePath := config.GetString("local_storage_path")
bundlePath = path.Join(storagePath, relativeBundlePath)
@@ -91,6 +100,8 @@
}
log.Infof("Bundle directory path is %s", bundlePath)
+ initializeBundleDownloading()
+
go distributeEvents()
initListener(services)
diff --git a/listener.go b/listener.go
index f631594..c3b591f 100644
--- a/listener.go
+++ b/listener.go
@@ -3,10 +3,11 @@
import (
"database/sql"
"encoding/json"
- "github.com/30x/apid-core"
- "github.com/apigee-labs/transicator/common"
"os"
"time"
+
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
)
const (
@@ -96,7 +97,7 @@
log.Panicf("unable to query database for unready deployments: %v", err)
}
for _, dep := range deployments {
- go downloadBundle(dep)
+ queueDownloadRequest(dep)
}
}()
}
@@ -200,8 +201,7 @@
return
}
- // todo: limit # concurrent downloads?
- go downloadBundle(d)
+ queueDownloadRequest(d)
return
}