Limit number of concurrent bundle downloads to 15
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7ae2a6c..7411e6f 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -4,16 +4,17 @@
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 
-	"github.com/30x/apid-core"
-	"github.com/30x/apid-core/factory"
+	"encoding/hex"
 	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
+	"net/url"
+	"os"
 	"testing"
 	"time"
-	"net/url"
-	"encoding/hex"
-	"os"
+
+	"github.com/30x/apid-core"
+	"github.com/30x/apid-core/factory"
 )
 
 var (
@@ -53,10 +54,21 @@
 	router := apid.API().Router()
 	// fake an unreliable bundle repo
 	count := 1
+	failedOnce := false
+	router.HandleFunc("/bundles/failonce", func(w http.ResponseWriter, req *http.Request) {
+		if failedOnce {
+			vars := apid.API().Vars(req)
+			w.Write([]byte("/bundles/" + vars["id"]))
+		} else {
+			failedOnce = true
+			w.WriteHeader(500)
+		}
+	}).Methods("GET")
+
 	router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
 		count++
 		vars := apid.API().Vars(req)
-		if count % 2 == 0 {
+		if count%2 == 0 {
 			w.WriteHeader(500)
 			return
 		}
@@ -71,7 +83,7 @@
 	router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
 		func(w http.ResponseWriter, req *http.Request) {
 			count++
-			if count % 2 == 0 {
+			if count%2 == 0 {
 				w.WriteHeader(500)
 				return
 			}
@@ -82,7 +94,7 @@
 
 			w.Write([]byte("OK"))
 
-	}).Methods("PUT")
+		}).Methods("PUT")
 	testServer = httptest.NewServer(router)
 
 	apiServerBaseURI, err = url.Parse(testServer.URL)
diff --git a/bundle.go b/bundle.go
index 83eaff9..651576c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -17,8 +17,11 @@
 	"time"
 )
 
+var numConcurrentDownloads = 15
 var bundleRetryDelay time.Duration = time.Second
 var bundleDownloadTimeout time.Duration = 10 * time.Minute
+var downloadQueue = make(chan *DownloadRequest, 200)
+var workerQueue = make(chan chan *DownloadRequest, numConcurrentDownloads)
 
 // simple doubling back-off
 func createBackoff(retryIn, maxBackOff time.Duration) func() {
@@ -32,7 +35,7 @@
 	}
 }
 
-func downloadBundle(dep DataDeployment) {
+func queueDownloadRequest(dep DataDeployment) {
 
 	hashWriter, err := getHashWriter(dep.BundleChecksumType)
 	if err != nil {
@@ -49,13 +52,17 @@
 		return
 	}
 
-	log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
-
 	retryIn := bundleRetryDelay
 	maxBackOff := 5 * time.Minute
-	backOffFunc := createBackoff(retryIn, maxBackOff)
+	req := &DownloadRequest{
+		dep:         dep,
+		hashWriter:  hashWriter,
+		bundleFile:  getBundleFile(dep),
+		backoffFunc: createBackoff(retryIn, maxBackOff),
+	}
+	downloadQueue <- req
 
-	// timeout and mark deployment failed
+	// timeout and mark deployment failed (but retries will continue)
 	timeout := time.NewTimer(bundleDownloadTimeout)
 	go func() {
 		<-timeout.C
@@ -76,34 +83,51 @@
 		})
 	}()
 
-	// todo: we'll want to abort download if deployment is deleted
-	for {
-		var tempFile, bundleFile string
-		tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+}
 
-		if err == nil {
-			bundleFile = getBundleFile(dep)
-			err = os.Rename(tempFile, bundleFile)
-			if err != nil {
-				log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
-			}
+type DownloadRequest struct {
+	dep         DataDeployment
+	hashWriter  hash.Hash
+	bundleFile  string
+	backoffFunc func()
+}
+
+func (r *DownloadRequest) downloadBundle() {
+
+	dep := r.dep
+	log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BundleURI)
+
+	deployments, err := getDeployments("WHERE id=$1", dep.ID)
+	if err == nil && len(deployments) == 0 {
+		log.Debugf("never mind, deployment %s was deleted", dep.ID)
+		return
+	}
+
+	r.hashWriter.Reset()
+	tempFile, err := downloadFromURI(dep.BundleURI, r.hashWriter, dep.BundleChecksum)
+
+	if err == nil {
+		err = os.Rename(tempFile, r.bundleFile)
+		if err != nil {
+			log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
 		}
+	}
 
-		if tempFile != "" {
-			go safeDelete(tempFile)
-		}
+	if tempFile != "" {
+		go safeDelete(tempFile)
+	}
 
-		if err == nil {
-			err = updateLocalBundleURI(dep.ID, bundleFile)
-		}
+	if err == nil {
+		err = updateLocalBundleURI(dep.ID, r.bundleFile)
+	}
 
-		// success!
-		if err == nil {
-			break
-		}
-
-		backOffFunc()
-		hashWriter.Reset()
+	if err != nil {
+		// add myself back into the queue after back off
+		go func() {
+			r.backoffFunc()
+			downloadQueue <- r
+		}()
+		return
 	}
 
 	log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
@@ -218,31 +242,62 @@
 	return []byte("")
 }
 
-//func checksumFile(hashType, checksum string, fileName string) error {
-//
-//	hashWriter, err := getHashWriter(hashType)
-//	if err != nil {
-//		return err
-//	}
-//
-//	file, err := os.Open(fileName)
-//	if err != nil {
-//		return err
-//	}
-//	defer file.Close()
-//
-//	if _, err := io.Copy(hashWriter, file); err != nil {
-//		return err
-//	}
-//
-//	hashBytes := hashWriter.Sum(nil)
-//	//hashBytes := hashWriter.Sum(nil)[:hasher.Size()]
-//	//hashBytes := hashWriter.Sum(nil)[:]
-//
-//	//hex.EncodeToString(hashBytes)
-//	if checksum != hex.EncodeToString(hashBytes) {
-//		return errors.New(fmt.Sprintf("bad checksum for %s", fileName))
-//	}
-//
-//	return nil
-//}
+func initializeBundleDownloading() {
+
+	// create workers
+	for i := 0; i < numConcurrentDownloads; i++ {
+		worker := BundleDownloader{
+			id:       i + 1,
+			workChan: make(chan *DownloadRequest),
+			quitChan: make(chan bool),
+		}
+		worker.Start()
+	}
+
+	// run dispatcher
+	go func() {
+		for {
+			select {
+			case req := <-downloadQueue:
+				log.Debugf("dispatching downloader for: %s", req.bundleFile)
+				go func() {
+					worker := <-workerQueue
+					log.Debugf("got a worker for: %s", req.bundleFile)
+					worker <- req
+				}()
+			}
+		}
+	}()
+}
+
+type BundleDownloader struct {
+	id       int
+	workChan chan *DownloadRequest
+	quitChan chan bool
+}
+
+func (w *BundleDownloader) Start() {
+	go func() {
+		log.Debugf("started bundle downloader %d", w.id)
+		for {
+			// wait for work
+			workerQueue <- w.workChan
+
+			select {
+			case req := <-w.workChan:
+				log.Debugf("starting download %s", req.bundleFile)
+				req.downloadBundle()
+
+			case <-w.quitChan:
+				log.Debugf("bundle downloader %d stopped", w.id)
+				return
+			}
+		}
+	}()
+}
+
+func (w *BundleDownloader) Stop() {
+	go func() {
+		w.quitChan <- true
+	}()
+}
diff --git a/bundle_test.go b/bundle_test.go
index 4f2bad2..009eba6 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -2,10 +2,11 @@
 
 import (
 	"encoding/json"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
 	"net/url"
 	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 )
 
 var _ = Describe("bundle", func() {
@@ -60,7 +61,7 @@
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			go downloadBundle(dep)
+			queueDownloadRequest(dep)
 
 			// give download time to timeout
 			time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
@@ -97,6 +98,73 @@
 			Expect(d.LocalBundleURI).To(BeAnExistingFile())
 		})
 
+		It("should not continue attempts if deployment has been deleted", func() {
+
+			deploymentID := "bundle_download_deployment_deleted"
+
+			uri, err := url.Parse(testServer.URL)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			uri.Path = "/bundles/failonce"
+			bundleUri := uri.String()
+			bundle := bundleConfigJson{
+				Name:         uri.Path,
+				URI:          bundleUri,
+				ChecksumType: "crc-32",
+			}
+			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+			bundleJson, err := json.Marshal(bundle)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			tx, err := getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			dep := DataDeployment{
+				ID:                 deploymentID,
+				BundleConfigID:     deploymentID,
+				ApidClusterID:      deploymentID,
+				DataScopeID:        deploymentID,
+				BundleConfigJSON:   string(bundleJson),
+				ConfigJSON:         string(bundleJson),
+				Created:            "",
+				CreatedBy:          "",
+				Updated:            "",
+				UpdatedBy:          "",
+				BundleName:         deploymentID,
+				BundleURI:          bundle.URI,
+				BundleChecksum:     bundle.Checksum,
+				BundleChecksumType: bundle.ChecksumType,
+				LocalBundleURI:     "",
+				DeployStatus:       "",
+				DeployErrorCode:    0,
+				DeployErrorMessage: "",
+			}
+
+			err = InsertDeployment(tx, dep)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			queueDownloadRequest(dep)
+
+			// skip first try
+			time.Sleep(bundleRetryDelay)
+
+			// delete deployment
+			tx, err = getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+			deleteDeployment(tx, dep.ID)
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			// wait for final
+			time.Sleep(bundleRetryDelay)
+
+			// No way to test this programmatically currently
+			// search logs for "never mind, deployment bundle_download_deployment_deleted was deleted"
+		})
+
 		// todo: temporary - this tests that checksum is disabled until server implements (XAPID-544)
 		It("should TEMPORARILY download even if empty Checksum and ChecksumType", func() {
 
@@ -111,7 +179,7 @@
 				Name:         uri.Path,
 				URI:          bundleUri,
 				ChecksumType: "",
-				Checksum: "",
+				Checksum:     "",
 			}
 			bundleJson, err := json.Marshal(bundle)
 			Expect(err).ShouldNot(HaveOccurred())
@@ -146,7 +214,7 @@
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			go downloadBundle(dep)
+			queueDownloadRequest(dep)
 
 			// give download time to finish
 			time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
diff --git a/init.go b/init.go
index afc1366..68d75e6 100644
--- a/init.go
+++ b/init.go
@@ -2,11 +2,12 @@
 
 import (
 	"fmt"
-	"github.com/30x/apid-core"
 	"net/url"
 	"os"
 	"path"
 	"time"
+
+	"github.com/30x/apid-core"
 )
 
 const (
@@ -91,6 +92,8 @@
 	}
 	log.Infof("Bundle directory path is %s", bundlePath)
 
+	initializeBundleDownloading()
+
 	go distributeEvents()
 
 	initListener(services)
diff --git a/listener.go b/listener.go
index f631594..c3b591f 100644
--- a/listener.go
+++ b/listener.go
@@ -3,10 +3,11 @@
 import (
 	"database/sql"
 	"encoding/json"
-	"github.com/30x/apid-core"
-	"github.com/apigee-labs/transicator/common"
 	"os"
 	"time"
+
+	"github.com/30x/apid-core"
+	"github.com/apigee-labs/transicator/common"
 )
 
 const (
@@ -96,7 +97,7 @@
 				log.Panicf("unable to query database for unready deployments: %v", err)
 			}
 			for _, dep := range deployments {
-				go downloadBundle(dep)
+				queueDownloadRequest(dep)
 			}
 		}()
 	}
@@ -200,8 +201,7 @@
 		return
 	}
 
-	// todo: limit # concurrent downloads?
-	go downloadBundle(d)
+	queueDownloadRequest(d)
 	return
 }