Merge pull request #10 from 30x/XAPID-706

Xapid 706
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7ae2a6c..4604588 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -4,16 +4,17 @@
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 
-	"github.com/30x/apid-core"
-	"github.com/30x/apid-core/factory"
+	"encoding/hex"
 	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
+	"net/url"
+	"os"
 	"testing"
 	"time"
-	"net/url"
-	"encoding/hex"
-	"os"
+
+	"github.com/30x/apid-core"
+	"github.com/30x/apid-core/factory"
 )
 
 var (
@@ -49,14 +50,27 @@
 	bundleCleanupDelay = time.Millisecond
 	bundleRetryDelay = 10 * time.Millisecond
 	bundleDownloadTimeout = 50 * time.Millisecond
+	concurrentDownloads = 1
+	downloadQueueSize = 1
 
 	router := apid.API().Router()
 	// fake an unreliable bundle repo
 	count := 1
+	failedOnce := false
+	router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) {
+		if failedOnce {
+			vars := apid.API().Vars(req)
+			w.Write([]byte("/bundles/" + vars["id"]))
+		} else {
+			failedOnce = true
+			w.WriteHeader(500)
+		}
+	}).Methods("GET")
+
 	router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
 		count++
 		vars := apid.API().Vars(req)
-		if count % 2 == 0 {
+		if count%2 == 0 {
 			w.WriteHeader(500)
 			return
 		}
@@ -71,7 +85,7 @@
 	router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
 		func(w http.ResponseWriter, req *http.Request) {
 			count++
-			if count % 2 == 0 {
+			if count%2 == 0 {
 				w.WriteHeader(500)
 				return
 			}
@@ -82,7 +96,7 @@
 
 			w.Write([]byte("OK"))
 
-	}).Methods("PUT")
+		}).Methods("PUT")
 	testServer = httptest.NewServer(router)
 
 	apiServerBaseURI, err = url.Parse(testServer.URL)
diff --git a/bundle.go b/bundle.go
index 83eaff9..95fb3f6 100644
--- a/bundle.go
+++ b/bundle.go
@@ -17,8 +17,12 @@
 	"time"
 )
 
-var bundleRetryDelay time.Duration = time.Second
-var bundleDownloadTimeout time.Duration = 10 * time.Minute
+var (
+	bundleRetryDelay      = time.Second
+	bundleDownloadTimeout = 10 * time.Minute
+	downloadQueue         = make(chan *DownloadRequest, downloadQueueSize)
+	workerQueue           = make(chan chan *DownloadRequest, concurrentDownloads)
+)
 
 // simple doubling back-off
 func createBackoff(retryIn, maxBackOff time.Duration) func() {
@@ -32,7 +36,7 @@
 	}
 }
 
-func downloadBundle(dep DataDeployment) {
+func queueDownloadRequest(dep DataDeployment) {
 
 	hashWriter, err := getHashWriter(dep.BundleChecksumType)
 	if err != nil {
@@ -49,61 +53,65 @@
 		return
 	}
 
-	log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
-
 	retryIn := bundleRetryDelay
 	maxBackOff := 5 * time.Minute
-	backOffFunc := createBackoff(retryIn, maxBackOff)
+	timeoutAfter := time.Now().Add(bundleDownloadTimeout)
+	req := &DownloadRequest{
+		dep:          dep,
+		hashWriter:   hashWriter,
+		bundleFile:   getBundleFile(dep),
+		backoffFunc:  createBackoff(retryIn, maxBackOff),
+		timeoutAfter: timeoutAfter,
+	}
+	downloadQueue <- req
+}
 
-	// timeout and mark deployment failed
-	timeout := time.NewTimer(bundleDownloadTimeout)
-	go func() {
-		<-timeout.C
-		log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
-		var errMessage string
+type DownloadRequest struct {
+	dep          DataDeployment
+	hashWriter   hash.Hash
+	bundleFile   string
+	backoffFunc  func()
+	timeoutAfter time.Time
+}
+
+func (r *DownloadRequest) downloadBundle() {
+
+	dep := r.dep
+	log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI)
+
+	deployments, err := getDeployments("WHERE id=$1", dep.ID)
+	if err == nil && len(deployments) == 0 {
+		log.Debugf("never mind, deployment %s was deleted", dep.ID)
+		return
+	}
+
+	r.checkTimeout()
+
+	r.hashWriter.Reset()
+	tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum)
+
+	if err == nil {
+		err = os.Rename(tempFile, r.bundleFile)
 		if err != nil {
-			errMessage = fmt.Sprintf("bundle download failed: %s", err)
-		} else {
-			errMessage = "bundle download failed"
+			log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
 		}
-		setDeploymentResults(apiDeploymentResults{
-			{
-				ID:        dep.ID,
-				Status:    RESPONSE_STATUS_FAIL,
-				ErrorCode: ERROR_CODE_TODO,
-				Message:   errMessage,
-			},
-		})
-	}()
+	}
 
-	// todo: we'll want to abort download if deployment is deleted
-	for {
-		var tempFile, bundleFile string
-		tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+	if tempFile != "" {
+		go safeDelete(tempFile)
+	}
 
-		if err == nil {
-			bundleFile = getBundleFile(dep)
-			err = os.Rename(tempFile, bundleFile)
-			if err != nil {
-				log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
-			}
-		}
+	if err == nil {
+		err = updateLocalBundleURI(dep.ID, r.bundleFile)
+	}
 
-		if tempFile != "" {
-			go safeDelete(tempFile)
-		}
-
-		if err == nil {
-			err = updateLocalBundleURI(dep.ID, bundleFile)
-		}
-
-		// success!
-		if err == nil {
-			break
-		}
-
-		backOffFunc()
-		hashWriter.Reset()
+	if err != nil {
+		// add myself back into the queue after back off
+		go func() {
+			r.backoffFunc()
+			downloadQueue <- r
+		}()
+		return
 	}
 
 	log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
@@ -112,6 +120,25 @@
 	deploymentsChanged <- dep.ID
 }
 
+func (r *DownloadRequest) checkTimeout() {
+
+	if !r.timeoutAfter.IsZero() {
+		if time.Now().After(r.timeoutAfter) {
+			r.timeoutAfter = time.Time{}
+			log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
+				r.dep.ID, r.dep.BundleURI)
+			setDeploymentResults(apiDeploymentResults{
+				{
+					ID:        r.dep.ID,
+					Status:    RESPONSE_STATUS_FAIL,
+					ErrorCode: ERROR_CODE_TODO,
+					Message:   "bundle download failed",
+				},
+			})
+		}
+	}
+}
+
 func getBundleFile(dep DataDeployment) string {
 
 	// the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
@@ -218,31 +245,62 @@
 	return []byte("")
 }
 
-//func checksumFile(hashType, checksum string, fileName string) error {
-//
-//	hashWriter, err := getHashWriter(hashType)
-//	if err != nil {
-//		return err
-//	}
-//
-//	file, err := os.Open(fileName)
-//	if err != nil {
-//		return err
-//	}
-//	defer file.Close()
-//
-//	if _, err := io.Copy(hashWriter, file); err != nil {
-//		return err
-//	}
-//
-//	hashBytes := hashWriter.Sum(nil)
-//	//hashBytes := hashWriter.Sum(nil)[:hasher.Size()]
-//	//hashBytes := hashWriter.Sum(nil)[:]
-//
-//	//hex.EncodeToString(hashBytes)
-//	if checksum != hex.EncodeToString(hashBytes) {
-//		return errors.New(fmt.Sprintf("bad checksum for %s", fileName))
-//	}
-//
-//	return nil
-//}
+func initializeBundleDownloading() {
+
+	// create workers
+	for i := 0; i < concurrentDownloads; i++ {
+		worker := BundleDownloader{
+			id:       i + 1,
+			workChan: make(chan *DownloadRequest),
+			quitChan: make(chan bool),
+		}
+		worker.Start()
+	}
+
+	// run dispatcher
+	go func() {
+		for {
+			select {
+			case req := <-downloadQueue:
+				log.Debugf("dispatching downloader for: %s", req.bundleFile)
+				go func() {
+					worker := <-workerQueue
+					log.Debugf("got a worker for: %s", req.bundleFile)
+					worker <- req
+				}()
+			}
+		}
+	}()
+}
+
+type BundleDownloader struct {
+	id       int
+	workChan chan *DownloadRequest
+	quitChan chan bool
+}
+
+func (w *BundleDownloader) Start() {
+	go func() {
+		log.Debugf("started bundle downloader %d", w.id)
+		for {
+			// wait for work
+			workerQueue <- w.workChan
+
+			select {
+			case req := <-w.workChan:
+				log.Debugf("starting download %s", req.bundleFile)
+				req.downloadBundle()
+
+			case <-w.quitChan:
+				log.Debugf("bundle downloader %d stopped", w.id)
+				return
+			}
+		}
+	}()
+}
+
+func (w *BundleDownloader) Stop() {
+	go func() {
+		w.quitChan <- true
+	}()
+}
diff --git a/bundle_test.go b/bundle_test.go
index 4f2bad2..796be9a 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -2,10 +2,14 @@
 
 import (
 	"encoding/json"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
 	"net/url"
 	"time"
+
+	"net/http"
+	"net/http/httptest"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 )
 
 var _ = Describe("bundle", func() {
@@ -14,9 +18,24 @@
 
 		It("should timeout, mark status as failed, then finish", func() {
 
+			proceed := make(chan bool)
+			failedOnce := false
+			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				if failedOnce {
+					proceed <- true
+					time.Sleep(bundleDownloadTimeout)
+					w.Write([]byte("/bundles/longfail"))
+				} else {
+					failedOnce = true
+					time.Sleep(bundleDownloadTimeout)
+					w.WriteHeader(500)
+				}
+			}))
+			defer ts.Close()
+
 			deploymentID := "bundle_download_fail"
 
-			uri, err := url.Parse(testServer.URL)
+			uri, err := url.Parse(ts.URL)
 			Expect(err).ShouldNot(HaveOccurred())
 
 			uri.Path = "/bundles/longfail"
@@ -60,10 +79,9 @@
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			go downloadBundle(dep)
+			queueDownloadRequest(dep)
 
-			// give download time to timeout
-			time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
+			<-proceed
 
 			// get error state deployment
 			deployments, err := getDeployments("WHERE id=$1", deploymentID)
@@ -97,6 +115,73 @@
 			Expect(d.LocalBundleURI).To(BeAnExistingFile())
 		})
 
+		It("should not continue attempts if deployment has been deleted", func() {
+
+			deploymentID := "bundle_download_deployment_deleted"
+
+			uri, err := url.Parse(testServer.URL)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			uri.Path = "/bundles/failonce"
+			bundleUri := uri.String()
+			bundle := bundleConfigJson{
+				Name:         uri.Path,
+				URI:          bundleUri,
+				ChecksumType: "crc-32",
+			}
+			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+			bundleJson, err := json.Marshal(bundle)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			tx, err := getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			dep := DataDeployment{
+				ID:                 deploymentID,
+				BundleConfigID:     deploymentID,
+				ApidClusterID:      deploymentID,
+				DataScopeID:        deploymentID,
+				BundleConfigJSON:   string(bundleJson),
+				ConfigJSON:         string(bundleJson),
+				Created:            "",
+				CreatedBy:          "",
+				Updated:            "",
+				UpdatedBy:          "",
+				BundleName:         deploymentID,
+				BundleURI:          bundle.URI,
+				BundleChecksum:     bundle.Checksum,
+				BundleChecksumType: bundle.ChecksumType,
+				LocalBundleURI:     "",
+				DeployStatus:       "",
+				DeployErrorCode:    0,
+				DeployErrorMessage: "",
+			}
+
+			err = InsertDeployment(tx, dep)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			queueDownloadRequest(dep)
+
+			// skip first try
+			time.Sleep(bundleRetryDelay)
+
+			// delete deployment
+			tx, err = getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+			deleteDeployment(tx, dep.ID)
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			// wait for final
+			time.Sleep(bundleRetryDelay)
+
+			// No way to test this programmatically currently
+			// search logs for "never mind, deployment bundle_download_deployment_deleted was deleted"
+		})
+
 		// todo: temporary - this tests that checksum is disabled until server implements (XAPID-544)
 		It("should TEMPORARILY download even if empty Checksum and ChecksumType", func() {
 
@@ -111,7 +196,7 @@
 				Name:         uri.Path,
 				URI:          bundleUri,
 				ChecksumType: "",
-				Checksum: "",
+				Checksum:     "",
 			}
 			bundleJson, err := json.Marshal(bundle)
 			Expect(err).ShouldNot(HaveOccurred())
@@ -146,7 +231,7 @@
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			go downloadBundle(dep)
+			queueDownloadRequest(dep)
 
 			// give download time to finish
 			time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
diff --git a/init.go b/init.go
index afc1366..f507357 100644
--- a/init.go
+++ b/init.go
@@ -2,11 +2,12 @@
 
 import (
 	"fmt"
-	"github.com/30x/apid-core"
 	"net/url"
 	"os"
 	"path"
 	"time"
+
+	"github.com/30x/apid-core"
 )
 
 const (
@@ -17,18 +18,22 @@
 	configApiServerBaseURI      = "apigeesync_proxy_server_base"
 	configApidInstanceID        = "apigeesync_apid_instance_id"
 	configApidClusterID         = "apigeesync_cluster_id"
+	configConcurrentDownloads   = "apigeesync_concurrent_downloads"
+	configDownloadQueueSize     = "apigeesync_download_queue_size"
 )
 
 var (
-	services           apid.Services
-	log                apid.LogService
-	data               apid.DataService
-	bundlePath         string
-	debounceDuration   time.Duration
-	bundleCleanupDelay time.Duration
-	apiServerBaseURI   *url.URL
-	apidInstanceID     string
-	apidClusterID      string
+	services            apid.Services
+	log                 apid.LogService
+	data                apid.DataService
+	bundlePath          string
+	debounceDuration    time.Duration
+	bundleCleanupDelay  time.Duration
+	apiServerBaseURI    *url.URL
+	apidInstanceID      string
+	apidClusterID       string
+	downloadQueueSize   int
+	concurrentDownloads int
 )
 
 func init() {
@@ -65,6 +70,8 @@
 	config.SetDefault(configDebounceDuration, time.Second)
 	config.SetDefault(configBundleCleanupDelay, time.Minute)
 	config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
+	config.SetDefault(configConcurrentDownloads, 15)
+	config.SetDefault(configDownloadQueueSize, 2000)
 
 	debounceDuration = config.GetDuration(configDebounceDuration)
 	if debounceDuration < time.Millisecond {
@@ -83,6 +90,8 @@
 
 	data = services.Data()
 
+	concurrentDownloads = config.GetInt(configConcurrentDownloads)
+	downloadQueueSize = config.GetInt(configDownloadQueueSize)
 	relativeBundlePath := config.GetString(configBundleDirKey)
 	storagePath := config.GetString("local_storage_path")
 	bundlePath = path.Join(storagePath, relativeBundlePath)
@@ -91,6 +100,8 @@
 	}
 	log.Infof("Bundle directory path is %s", bundlePath)
 
+	initializeBundleDownloading()
+
 	go distributeEvents()
 
 	initListener(services)
diff --git a/listener.go b/listener.go
index f631594..c3b591f 100644
--- a/listener.go
+++ b/listener.go
@@ -3,10 +3,11 @@
 import (
 	"database/sql"
 	"encoding/json"
-	"github.com/30x/apid-core"
-	"github.com/apigee-labs/transicator/common"
 	"os"
 	"time"
+
+	"github.com/30x/apid-core"
+	"github.com/apigee-labs/transicator/common"
 )
 
 const (
@@ -96,7 +97,7 @@
 				log.Panicf("unable to query database for unready deployments: %v", err)
 			}
 			for _, dep := range deployments {
-				go downloadBundle(dep)
+				queueDownloadRequest(dep)
 			}
 		}()
 	}
@@ -200,8 +201,7 @@
 		return
 	}
 
-	// todo: limit # concurrent downloads?
-	go downloadBundle(d)
+	queueDownloadRequest(d)
 	return
 }