reformat bundle.go to make it unit-testable
diff --git a/api.go b/api.go
index 5054d98..5aea7f2 100644
--- a/api.go
+++ b/api.go
@@ -293,7 +293,7 @@
 	for _, d := range dataDeps {
 		apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
 			Self:           apiDeps.Self + "/" + d.ID,
-			Name:		d.Name,
+			Name:           d.Name,
 			Type:           d.Type,
 			Org:            d.OrgID,
 			Env:            d.EnvID,
@@ -332,7 +332,7 @@
 }
 
 // TODO
-func getDeploymentScope() string{
+func getDeploymentScope() string {
 	return ""
 }
 
@@ -350,4 +350,3 @@
 	log.Error("convertTime: Unsupported time format: " + t)
 	return t
 }
-
diff --git a/api_test.go b/api_test.go
index fe8bef6..150601d 100644
--- a/api_test.go
+++ b/api_test.go
@@ -12,6 +12,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 package apiGatewayConfDeploy
+
 import (
 	"encoding/json"
 	"io/ioutil"
@@ -22,14 +23,12 @@
 	. "github.com/onsi/gomega"
 )
 
-
-
-
-
 var _ = Describe("api", func() {
 	Context("GET /deployments", func() {
 
 		It("should get empty set if no deployments", func() {
+			//only called once
+			InitAPI()
 
 			var uri url.URL
 			uri = *apiServerBaseURI
@@ -50,188 +49,32 @@
 			//Expect(len(depRes)).To(Equal(0))
 			//Expect(string(body)).Should(Equal("[]"))
 		})
-		/*
-		It("should debounce requests", func(done Done) {
-			var in = make(chan interface{})
-			var out = make(chan []interface{})
-
-			go debounce(in, out, 3*time.Millisecond)
-
-			go func() {
-				defer GinkgoRecover()
-
-				received, ok := <-out
-				Expect(ok).To(BeTrue())
-				Expect(len(received)).To(Equal(2))
-
-				close(in)
-				received, ok = <-out
-				Expect(ok).To(BeFalse())
-
-				close(done)
-			}()
-
-			in <- "x"
-			in <- "y"
-		})
-
-		It("should get current deployments", func() {
-
-			deploymentID := "api_get_current"
-			insertTestDeployment(testServer, deploymentID)
-
-			uri, err := url.Parse(testServer.URL)
-			uri.Path = deploymentsEndpoint
-
-			res, err := http.Get(uri.String())
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-
-			Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
-			var depRes ApiDeploymentResponse
-			body, err := ioutil.ReadAll(res.Body)
-			Expect(err).ShouldNot(HaveOccurred())
-			json.Unmarshal(body, &depRes)
-
-			Expect(len(depRes)).To(Equal(1))
-
-			dep := depRes[0]
-
-			Expect(dep.ID).To(Equal(deploymentID))
-			Expect(dep.ScopeId).To(Equal(deploymentID))
-			Expect(dep.DisplayName).To(Equal(deploymentID))
-
-			var config bundleConfigJson
-
-			err = json.Unmarshal(dep.ConfigJson, &config)
-			Expect(err).ShouldNot(HaveOccurred())
-			Expect(config.Name).To(Equal("/bundles/1"))
-
-			err = json.Unmarshal(dep.BundleConfigJson, &config)
-			Expect(err).ShouldNot(HaveOccurred())
-			Expect(config.Name).To(Equal("/bundles/1"))
-		})
-
-		It("should get 304 for no change", func() {
-
-			deploymentID := "api_no_change"
-			insertTestDeployment(testServer, deploymentID)
-
-			uri, err := url.Parse(testServer.URL)
-			uri.Path = deploymentsEndpoint
-			res, err := http.Get(uri.String())
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-			Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
-			req, err := http.NewRequest("GET", uri.String(), nil)
-			req.Header.Add("Content-Type", "application/json")
-			req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
-			res, err = http.DefaultClient.Do(req)
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-			Expect(res.StatusCode).To(Equal(http.StatusNotModified))
-		})
-
-		It("should get empty set after blocking if no deployments", func() {
-
-			uri, err := url.Parse(testServer.URL)
-			uri.Path = deploymentsEndpoint
-
-			query := uri.Query()
-			query.Add("block", "1")
-			uri.RawQuery = query.Encode()
-
-			res, err := http.Get(uri.String())
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-
-			var depRes ApiDeploymentResponse
-			body, err := ioutil.ReadAll(res.Body)
-			Expect(err).ShouldNot(HaveOccurred())
-			json.Unmarshal(body, &depRes)
-
-			Expect(res.StatusCode).Should(Equal(http.StatusOK))
-			Expect(string(body)).Should(Equal("[]"))
-		})
-
-		It("should get new deployment set after blocking", func(done Done) {
-
-			deploymentID := "api_get_current_blocking"
-			insertTestDeployment(testServer, deploymentID)
-			uri, err := url.Parse(testServer.URL)
-			uri.Path = deploymentsEndpoint
-			res, err := http.Get(uri.String())
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-			eTag := res.Header.Get("etag")
-			Expect(eTag).ShouldNot(BeEmpty())
-
-			deploymentID = "api_get_current_blocking2"
-			go func() {
-				defer GinkgoRecover()
-
-				query := uri.Query()
-				query.Add("block", "1")
-				uri.RawQuery = query.Encode()
-				req, err := http.NewRequest("GET", uri.String(), nil)
-				req.Header.Add("Content-Type", "application/json")
-				req.Header.Add("If-None-Match", eTag)
-
-				res, err := http.DefaultClient.Do(req)
-				Expect(err).ShouldNot(HaveOccurred())
-				defer res.Body.Close()
-				Expect(res.StatusCode).To(Equal(http.StatusOK))
-
-				Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-				Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
-
-				var depRes ApiDeploymentResponse
-				body, err := ioutil.ReadAll(res.Body)
-				Expect(err).ShouldNot(HaveOccurred())
-				json.Unmarshal(body, &depRes)
-
-				Expect(len(depRes)).To(Equal(2))
-
-				dep := depRes[1]
-
-				Expect(dep.ID).To(Equal(deploymentID))
-				Expect(dep.ScopeId).To(Equal(deploymentID))
-				Expect(dep.DisplayName).To(Equal(deploymentID))
-
-				close(done)
-			}()
-
-			time.Sleep(250 * time.Millisecond) // give api call above time to block
-			insertTestDeployment(testServer, deploymentID)
-			deploymentsChanged <- deploymentID
-		})
-
-		It("should get 304 after blocking if no new deployment", func() {
-
-			deploymentID := "api_no_change_blocking"
-			insertTestDeployment(testServer, deploymentID)
-			uri, err := url.Parse(testServer.URL)
-			uri.Path = deploymentsEndpoint
-			res, err := http.Get(uri.String())
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-			Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
-			query := uri.Query()
-			query.Add("block", "1")
-			uri.RawQuery = query.Encode()
-			req, err := http.NewRequest("GET", uri.String(), nil)
-			req.Header.Add("Content-Type", "application/json")
-			req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
-			res, err = http.DefaultClient.Do(req)
-			Expect(err).ShouldNot(HaveOccurred())
-			defer res.Body.Close()
-			Expect(res.StatusCode).To(Equal(http.StatusNotModified))
-		})
-		*/
 	})
-})
\ No newline at end of file
+})
+
+type dummyDbMan struct {
+}
+
+func (d *dummyDbMan) setDbVersion(version string) {
+
+}
+
+func (d *dummyDbMan) initDb() error {
+	return nil
+}
+
+func (d *dummyDbMan) getUnreadyDeployments() ([]DataDeployment, error) {
+	return nil, nil
+}
+
+func (d *dummyDbMan) getReadyDeployments() ([]DataDeployment, error) {
+	return nil, nil
+}
+
+func (d *dummyDbMan) updateLocalFsLocation(string, string, string) error {
+	return nil
+}
+
+func (d *dummyDbMan) getLocalFSLocation(string) (string, error) {
+	return "", nil
+}
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go
index acd4eb9..165c3a5 100644
--- a/apidGatewayConfDeploy_suite_test.go
+++ b/apidGatewayConfDeploy_suite_test.go
@@ -1,18 +1,18 @@
 package apiGatewayConfDeploy
 
 import (
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
 	"github.com/30x/apid-core"
 	"github.com/30x/apid-core/factory"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 	"io/ioutil"
-	"time"
 	"os"
+	"testing"
+	"time"
 )
 
-
 var (
-	tmpDir              string
+	tmpDir string
 )
 
 var _ = BeforeSuite(func() {
@@ -27,18 +27,16 @@
 	config.Set(configApidClusterID, "CLUSTER_ID")
 	config.Set(configApiServerBaseURI, "http://localhost")
 	config.Set(configDebounceDuration, "1ms")
-	apid.InitializePlugins("")
-
-
-	bundleCleanupDelay = time.Millisecond
-	bundleRetryDelay = 10 * time.Millisecond
-	markDeploymentFailedAfter = 50 * time.Millisecond
-	concurrentDownloads = 1
-	downloadQueueSize = 1
-
+	config.Set(configDownloadQueueSize, 1)
+	config.Set(configBundleCleanupDelay, time.Millisecond)
 })
 
 var _ = AfterSuite(func() {
 	apid.Events().Close()
 	os.RemoveAll(tmpDir)
-})
\ No newline at end of file
+})
+
+func TestApidGatewayDeploy(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "ApidGatewayConfDeploy Suite")
+}
diff --git a/bundle.go b/bundle.go
index ee36cbb..86fa7c8 100644
--- a/bundle.go
+++ b/bundle.go
@@ -32,88 +32,138 @@
 )
 
 var (
-	markDeploymentFailedAfter time.Duration
-	bundleDownloadConnTimeout time.Duration
-	bundleRetryDelay          = time.Second
-	downloadQueue             = make(chan *DownloadRequest, downloadQueueSize)
-	workerQueue               = make(chan chan *DownloadRequest, concurrentDownloads)
+	bundleMan bundleManagerInterface
 )
 
-// simple doubling back-off
-func createBackoff(retryIn, maxBackOff time.Duration) func() {
-	return func() {
-		log.Debugf("backoff called. will retry in %s.", retryIn)
-		time.Sleep(retryIn)
-		retryIn = retryIn * time.Duration(2)
-		if retryIn > maxBackOff {
-			retryIn = maxBackOff
+type bundleManagerInterface interface {
+	initializeBundleDownloading()
+	queueDownloadRequest(*DataDeployment)
+	enqueueRequest(*DownloadRequest)
+	deleteBundles([]DataDeployment)
+	Close()
+}
+
+type bundleManager struct {
+	concurrentDownloads       int
+	markDeploymentFailedAfter time.Duration
+	bundleDownloadConnTimeout time.Duration
+	bundleRetryDelay          time.Duration
+	bundleCleanupDelay        time.Duration
+	downloadQueue             chan *DownloadRequest
+	isClosed                  *int32
+	workers                   []*BundleDownloader
+}
+
+func (bm *bundleManager) initializeBundleDownloading() {
+	atomic.StoreInt32(bm.isClosed, 0)
+	bm.workers = make([]*BundleDownloader, bm.concurrentDownloads)
+
+	// create workers
+	for i := 0; i < bm.concurrentDownloads; i++ {
+		worker := BundleDownloader{
+			id:       i + 1,
+			workChan: make(chan *DownloadRequest),
 		}
+		bm.workers[i] = &worker
+		worker.Start()
 	}
 }
 
-func queueDownloadRequest(dep DataDeployment) {
+func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) {
 
-	retryIn := bundleRetryDelay
+	retryIn := bm.bundleRetryDelay
 	maxBackOff := 5 * time.Minute
-	markFailedAt := time.Now().Add(markDeploymentFailedAfter)
+	markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
 	req := &DownloadRequest{
 		dep:          dep,
 		bundleFile:   getBundleFile(dep),
 		backoffFunc:  createBackoff(retryIn, maxBackOff),
 		markFailedAt: markFailedAt,
+		connTimeout:  bm.bundleDownloadConnTimeout,
 	}
-	downloadQueue <- req
+	go bm.enqueueRequest(req)
+}
+
+// a blocking method to enqueue download requests
+func (bm *bundleManager) enqueueRequest(r *DownloadRequest) {
+	if atomic.LoadInt32(bm.isClosed) == 1 {
+		return
+	}
+	defer func() {
+		if r := recover(); r != nil {
+			log.Warn("trying to enque requests to closed bundleManager")
+		}
+	}()
+	bm.downloadQueue <- r
+}
+
+func (bm *bundleManager) Close() {
+	atomic.StoreInt32(bm.isClosed, 1)
+	close(bm.downloadQueue)
+}
+
+func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) {
+	log.Debugf("will delete %d old bundles", len(deletedDeployments))
+	go func() {
+		// give clients a minute to avoid conflicts
+		time.Sleep(bm.bundleCleanupDelay)
+		for _, dep := range deletedDeployments {
+			bundleFile := getBundleFile(&dep)
+			log.Debugf("removing old bundle: %v", bundleFile)
+			// TODO Remove from the Database table edgex_blob_available
+			safeDelete(bundleFile)
+		}
+	}()
 }
 
 type DownloadRequest struct {
-	dep          DataDeployment
+	dep          *DataDeployment
 	bundleFile   string
 	backoffFunc  func()
 	markFailedAt time.Time
+	connTimeout  time.Duration
 }
 
-func (r *DownloadRequest) downloadBundle() {
+func (r *DownloadRequest) downloadBundle() error {
 
 	dep := r.dep
-	log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID)
+	log.Debugf("starting bundle download attempt for depId=%s: blobId=%s", dep.ID, dep.BlobID)
 
 	r.checkTimeout()
 
-	tempFile, err := downloadFromURI(dep.BlobID)
-
-	if err == nil {
-		err = os.Rename(tempFile, r.bundleFile)
-		if err != nil {
-			log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
-		}
-	}
-
-	if tempFile != "" {
-		go safeDelete(tempFile)
-	}
-
-	if err == nil {
-		blobId := atomic.AddInt64(&gwBlobId, 1)
-		blobIds := strconv.FormatInt(blobId, 10)
-		err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
-		if err != nil {
-			dep.GWBlobID = blobIds
-		}
-	}
+	tempFile, err := downloadFromURI(dep.BlobID, r.connTimeout)
 
 	if err != nil {
-		// add myself back into the queue after back off
-		go func() {
-			r.backoffFunc()
-			downloadQueue <- r
-		}()
-		return
+		log.Errorf("Unable to download blob file blobId=%s: %s", dep.BlobID, err)
+		return err
 	}
 
-	log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID)
+	defer func() {
+		if tempFile != "" {
+			go safeDelete(tempFile)
+		}
+	}()
+
+	err = os.Rename(tempFile, r.bundleFile)
+	if err != nil {
+		log.Errorf("Unable to rename temp blob file %s to %s: %s", tempFile, r.bundleFile, err)
+		return err
+	}
+
+	blobId := atomic.AddInt64(&gwBlobId, 1)
+	blobIds := strconv.FormatInt(blobId, 10)
+	err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
+	if err != nil {
+		return err
+	}
+	dep.GWBlobID = blobIds
+
+	log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID)
 
 	// send deployments to client
 	deploymentsChanged <- dep.ID
+
+	return nil
 }
 
 func (r *DownloadRequest) checkTimeout() {
@@ -128,15 +178,15 @@
 
 }
 
-func getBundleFile(dep DataDeployment) string {
+func getBundleFile(dep *DataDeployment) string {
 
 	return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID)))
 
 }
 
-func getSignedURL(blobId string) (string, error) {
+func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
 
-	blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI))
+	blobUri, err := url.Parse(blobServerURL)
 	if err != nil {
 		log.Panicf("bad url value for config %s: %s", blobUri, err)
 	}
@@ -149,7 +199,7 @@
 
 	uri := blobUri.String()
 
-	surl, err := getURIReader(uri)
+	surl, err := getURIReader(uri, bundleDownloadConnTimeout)
 	if err != nil {
 		log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
 		return "", err
@@ -165,12 +215,12 @@
 
 // downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
 // after downloading the resource from GCS (via the signed URL)
-func downloadFromURI(blobId string) (tempFileName string, err error) {
+func downloadFromURI(blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
 
 	var tempFile *os.File
 	log.Debugf("Downloading bundle: %s", blobId)
 
-	uri, err := getSignedURL(blobId)
+	uri, err := getSignedURL(blobId, bundleDownloadConnTimeout)
 	if err != nil {
 		log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
 		return
@@ -185,7 +235,7 @@
 	tempFileName = tempFile.Name()
 
 	var confReader io.ReadCloser
-	confReader, err = getURIReader(uri)
+	confReader, err = getURIReader(uri, bundleDownloadConnTimeout)
 	if err != nil {
 		log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
 		return
@@ -203,7 +253,7 @@
 }
 
 // retrieveBundle retrieves bundle data from a URI
-func getURIReader(uriString string) (io.ReadCloser, error) {
+func getURIReader(uriString string, bundleDownloadConnTimeout time.Duration) (io.ReadCloser, error) {
 
 	client := http.Client{
 		Timeout: bundleDownloadConnTimeout,
@@ -218,62 +268,38 @@
 	return res.Body, nil
 }
 
-func initializeBundleDownloading() {
-
-	// create workers
-	for i := 0; i < concurrentDownloads; i++ {
-		worker := BundleDownloader{
-			id:       i + 1,
-			workChan: make(chan *DownloadRequest),
-			quitChan: make(chan bool),
-		}
-		worker.Start()
-	}
-
-	// run dispatcher
-	go func() {
-		for {
-			select {
-			case req := <-downloadQueue:
-				log.Debugf("dispatching downloader for: %s", req.bundleFile)
-				go func() {
-					worker := <-workerQueue
-					log.Debugf("got a worker for: %s", req.bundleFile)
-					worker <- req
-				}()
-			}
-		}
-	}()
-}
-
 type BundleDownloader struct {
 	id       int
 	workChan chan *DownloadRequest
-	quitChan chan bool
+	bm       *bundleManager
 }
 
 func (w *BundleDownloader) Start() {
 	go func() {
 		log.Debugf("started bundle downloader %d", w.id)
-		for {
-			// wait for work
-			workerQueue <- w.workChan
 
-			select {
-			case req := <-w.workChan:
-				log.Debugf("starting download %s", req.bundleFile)
-				req.downloadBundle()
-
-			case <-w.quitChan:
-				log.Debugf("bundle downloader %d stopped", w.id)
-				return
+		for req := range w.bm.downloadQueue {
+			log.Debugf("starting download %s", req.bundleFile)
+			err := req.downloadBundle()
+			if err != nil {
+				go func() {
+					req.backoffFunc()
+					w.bm.enqueueRequest(req)
+				}()
 			}
 		}
+		log.Debugf("bundle downloader %d stopped", w.id)
 	}()
 }
 
-func (w *BundleDownloader) Stop() {
-	go func() {
-		w.quitChan <- true
-	}()
+// simple doubling back-off
+func createBackoff(retryIn, maxBackOff time.Duration) func() {
+	return func() {
+		log.Debugf("backoff called. will retry in %s.", retryIn)
+		time.Sleep(retryIn)
+		retryIn = retryIn * time.Duration(2)
+		if retryIn > maxBackOff {
+			retryIn = maxBackOff
+		}
+	}
 }
diff --git a/data.go b/data.go
index 3ec5eca..0be62ca 100644
--- a/data.go
+++ b/data.go
@@ -21,7 +21,7 @@
 )
 
 var (
-	dbMan dbManagerInterface
+	dbMan    dbManagerInterface
 	gwBlobId int64
 )
 
@@ -47,7 +47,6 @@
 	Exec(query string, args ...interface{}) (sql.Result, error)
 }
 
-
 type dbManagerInterface interface {
 	setDbVersion(string)
 	initDb() error
@@ -57,11 +56,10 @@
 	getLocalFSLocation(string) (string, error)
 }
 
-
 type dbManager struct {
-	data apid.DataService
-	db apid.DB
-	dbMux    sync.RWMutex
+	data  apid.DataService
+	db    apid.DB
+	dbMux sync.RWMutex
 }
 
 func (dbc *dbManager) setDbVersion(version string) {
@@ -80,7 +78,7 @@
 	return dbc.db
 }
 
-func (dbc *dbManager) initDb() error{
+func (dbc *dbManager) initDb() error {
 	_, err := dbc.getDb().Exec(`
 	CREATE TABLE IF NOT EXISTS edgex_blob_available (
    		gwblobid integer primary key,
@@ -100,7 +98,6 @@
 // getUnreadyDeployments() returns array of resources that are not yet to be processed
 func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) {
 
-
 	rows, err := dbc.getDb().Query(`
 	SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
 		FROM project_runtime_blob_metadata
@@ -133,7 +130,6 @@
 // getDeployments()
 func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) {
 
-
 	rows, err := dbc.getDb().Query(`
 	SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
 		a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
diff --git a/init.go b/init.go
index 7c9b530..552ad3a 100644
--- a/init.go
+++ b/init.go
@@ -40,18 +40,15 @@
 )
 
 var (
-	services            apid.Services
-	log                 apid.LogService
-	config              apid.ConfigService
-	bundlePath          string
-	debounceDuration    time.Duration
-	bundleCleanupDelay  time.Duration
-	apiServerBaseURI    *url.URL
-	blobServerURL       string
-	apidInstanceID      string
-	apidClusterID       string
-	downloadQueueSize   int
-	concurrentDownloads int
+	services         apid.Services
+	log              apid.LogService
+	config           apid.ConfigService
+	bundlePath       string
+	debounceDuration time.Duration
+	apiServerBaseURI *url.URL
+	blobServerURL    string
+	apidInstanceID   string
+	apidClusterID    string
 )
 
 func init() {
@@ -102,29 +99,39 @@
 		return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
 	}
 
-	bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay)
+	bundleCleanupDelay := config.GetDuration(configBundleCleanupDelay)
 	if bundleCleanupDelay < time.Millisecond {
 		return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
 	}
 
-	markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter)
+	markDeploymentFailedAfter := config.GetDuration(configMarkDeployFailedAfter)
 	if markDeploymentFailedAfter < time.Millisecond {
 		return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter)
 	}
 
-	bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout)
+	bundleDownloadConnTimeout := config.GetDuration(configDownloadConnTimeout)
 	if bundleDownloadConnTimeout < time.Millisecond {
 		return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
 	}
 
+	concurrentDownloads := config.GetInt(configConcurrentDownloads)
+	downloadQueueSize := config.GetInt(configDownloadQueueSize)
+	bundleMan = &bundleManager{
+		concurrentDownloads:       concurrentDownloads,
+		markDeploymentFailedAfter: markDeploymentFailedAfter,
+		bundleDownloadConnTimeout: bundleDownloadConnTimeout,
+		bundleRetryDelay:          time.Second,
+		bundleCleanupDelay:        bundleCleanupDelay,
+		downloadQueue:             make(chan *DownloadRequest, downloadQueueSize),
+		isClosed:                  new(int32),
+	}
+
 	dbMan = &dbManager{
-		data: services.Data(),
+		data:  services.Data(),
 		dbMux: sync.RWMutex{},
 	}
 
 	blobServerURL = config.GetString(configBlobServerBaseURI)
-	concurrentDownloads = config.GetInt(configConcurrentDownloads)
-	downloadQueueSize = config.GetInt(configDownloadQueueSize)
 	relativeBundlePath := config.GetString(configBundleDirKey)
 	storagePath := config.GetString("local_storage_path")
 	bundlePath = path.Join(storagePath, relativeBundlePath)
@@ -133,7 +140,7 @@
 	}
 	log.Infof("Bundle directory path is %s", bundlePath)
 
-	initializeBundleDownloading()
+	bundleMan.initializeBundleDownloading()
 
 	go distributeEvents()
 
diff --git a/listener.go b/listener.go
index d30aabb..77c30a5 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
 
 import (
 	"os"
-	"time"
 
 	"database/sql"
 	"github.com/30x/apid-core"
@@ -81,7 +80,7 @@
 		}
 		log.Debugf("Queuing %d deployments for bundle download", len(deployments))
 		for _, dep := range deployments {
-			queueDownloadRequest(dep)
+			go bundleMan.queueDownloadRequest(&dep)
 		}
 	}()
 }
@@ -117,22 +116,13 @@
 	}
 
 	for _, dep := range insertedDeployments {
-		queueDownloadRequest(dep)
+		go bundleMan.queueDownloadRequest(&dep)
 	}
 
 	// clean up old bundles
 	if len(deletedDeployments) > 0 {
 		log.Debugf("will delete %d old bundles", len(deletedDeployments))
-		go func() {
-			// give clients a minute to avoid conflicts
-			time.Sleep(bundleCleanupDelay)
-			for _, dep := range deletedDeployments {
-				bundleFile := getBundleFile(dep)
-				log.Debugf("removing old bundle: %v", bundleFile)
-				// TODO Remove from the Database table edgex_blob_available
-				safeDelete(bundleFile)
-			}
-		}()
+		bundleMan.deleteBundles(deletedDeployments)
 	}
 }