reformat bundle.go to make it unit-testable
diff --git a/api.go b/api.go index 5054d98..5aea7f2 100644 --- a/api.go +++ b/api.go
@@ -293,7 +293,7 @@ for _, d := range dataDeps { apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ Self: apiDeps.Self + "/" + d.ID, - Name: d.Name, + Name: d.Name, Type: d.Type, Org: d.OrgID, Env: d.EnvID, @@ -332,7 +332,7 @@ } // TODO -func getDeploymentScope() string{ +func getDeploymentScope() string { return "" } @@ -350,4 +350,3 @@ log.Error("convertTime: Unsupported time format: " + t) return t } -
diff --git a/api_test.go b/api_test.go index fe8bef6..150601d 100644 --- a/api_test.go +++ b/api_test.go
@@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. package apiGatewayConfDeploy + import ( "encoding/json" "io/ioutil" @@ -22,14 +23,12 @@ . "github.com/onsi/gomega" ) - - - - var _ = Describe("api", func() { Context("GET /deployments", func() { It("should get empty set if no deployments", func() { + //only called once + InitAPI() var uri url.URL uri = *apiServerBaseURI @@ -50,188 +49,32 @@ //Expect(len(depRes)).To(Equal(0)) //Expect(string(body)).Should(Equal("[]")) }) - /* - It("should debounce requests", func(done Done) { - var in = make(chan interface{}) - var out = make(chan []interface{}) - - go debounce(in, out, 3*time.Millisecond) - - go func() { - defer GinkgoRecover() - - received, ok := <-out - Expect(ok).To(BeTrue()) - Expect(len(received)).To(Equal(2)) - - close(in) - received, ok = <-out - Expect(ok).To(BeFalse()) - - close(done) - }() - - in <- "x" - in <- "y" - }) - - It("should get current deployments", func() { - - deploymentID := "api_get_current" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(1)) - - dep := depRes[0] - - Expect(dep.ID).To(Equal(deploymentID)) - Expect(dep.ScopeId).To(Equal(deploymentID)) - Expect(dep.DisplayName).To(Equal(deploymentID)) - - var config bundleConfigJson - - err = json.Unmarshal(dep.ConfigJson, &config) - Expect(err).ShouldNot(HaveOccurred()) - Expect(config.Name).To(Equal("/bundles/1")) - - err = json.Unmarshal(dep.BundleConfigJson, &config) - Expect(err).ShouldNot(HaveOccurred()) - Expect(config.Name).To(Equal("/bundles/1")) - }) - - It("should get 304 for no change", func() { - - deploymentID := "api_no_change" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", res.Header.Get("etag")) - - res, err = http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusNotModified)) - }) - - It("should get empty set after blocking if no deployments", func() { - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - Expect(string(body)).Should(Equal("[]")) - }) - - It("should get new deployment set after blocking", func(done Done) { - - deploymentID := "api_get_current_blocking" - insertTestDeployment(testServer, deploymentID) - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - eTag := res.Header.Get("etag") - Expect(eTag).ShouldNot(BeEmpty()) - - deploymentID = "api_get_current_blocking2" - go func() { - defer GinkgoRecover() - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", eTag) - - res, err := http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusOK)) - - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(2)) - - dep := depRes[1] - - Expect(dep.ID).To(Equal(deploymentID)) - Expect(dep.ScopeId).To(Equal(deploymentID)) - Expect(dep.DisplayName).To(Equal(deploymentID)) - - close(done) - }() - - time.Sleep(250 * time.Millisecond) // give api call above time to block - insertTestDeployment(testServer, deploymentID) - deploymentsChanged <- deploymentID - }) - - It("should get 304 after blocking if no new deployment", func() { - - deploymentID := "api_no_change_blocking" - insertTestDeployment(testServer, deploymentID) - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) - - query := uri.Query() - query.Add("block", "1") - uri.RawQuery = query.Encode() - req, err := http.NewRequest("GET", uri.String(), nil) - req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", res.Header.Get("etag")) - - res, err = http.DefaultClient.Do(req) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - Expect(res.StatusCode).To(Equal(http.StatusNotModified)) - }) - */ }) -}) \ No newline at end of file +}) + +type dummyDbMan struct { +} + +func (d *dummyDbMan) setDbVersion(version string) { + +} + +func (d *dummyDbMan) initDb() error { + return nil +} + +func (d *dummyDbMan) getUnreadyDeployments() ([]DataDeployment, error) { + return nil, nil +} + +func (d *dummyDbMan) getReadyDeployments() ([]DataDeployment, error) { + return nil, nil +} + +func (d *dummyDbMan) updateLocalFsLocation(string, string, string) error { + return nil +} + +func (d *dummyDbMan) getLocalFSLocation(string) (string, error) { + return "", nil +}
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go index acd4eb9..165c3a5 100644 --- a/apidGatewayConfDeploy_suite_test.go +++ b/apidGatewayConfDeploy_suite_test.go
@@ -1,18 +1,18 @@ package apiGatewayConfDeploy import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" "github.com/30x/apid-core" "github.com/30x/apid-core/factory" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" "io/ioutil" - "time" "os" + "testing" + "time" ) - var ( - tmpDir string + tmpDir string ) var _ = BeforeSuite(func() { @@ -27,18 +27,16 @@ config.Set(configApidClusterID, "CLUSTER_ID") config.Set(configApiServerBaseURI, "http://localhost") config.Set(configDebounceDuration, "1ms") - apid.InitializePlugins("") - - - bundleCleanupDelay = time.Millisecond - bundleRetryDelay = 10 * time.Millisecond - markDeploymentFailedAfter = 50 * time.Millisecond - concurrentDownloads = 1 - downloadQueueSize = 1 - + config.Set(configDownloadQueueSize, 1) + config.Set(configBundleCleanupDelay, time.Millisecond) }) var _ = AfterSuite(func() { apid.Events().Close() os.RemoveAll(tmpDir) -}) \ No newline at end of file +}) + +func TestApidGatewayDeploy(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ApidGatewayConfDeploy Suite") +}
diff --git a/bundle.go b/bundle.go index ee36cbb..86fa7c8 100644 --- a/bundle.go +++ b/bundle.go
@@ -32,88 +32,138 @@ ) var ( - markDeploymentFailedAfter time.Duration - bundleDownloadConnTimeout time.Duration - bundleRetryDelay = time.Second - downloadQueue = make(chan *DownloadRequest, downloadQueueSize) - workerQueue = make(chan chan *DownloadRequest, concurrentDownloads) + bundleMan bundleManagerInterface ) -// simple doubling back-off -func createBackoff(retryIn, maxBackOff time.Duration) func() { - return func() { - log.Debugf("backoff called. will retry in %s.", retryIn) - time.Sleep(retryIn) - retryIn = retryIn * time.Duration(2) - if retryIn > maxBackOff { - retryIn = maxBackOff +type bundleManagerInterface interface { + initializeBundleDownloading() + queueDownloadRequest(*DataDeployment) + enqueueRequest(*DownloadRequest) + deleteBundles([]DataDeployment) + Close() +} + +type bundleManager struct { + concurrentDownloads int + markDeploymentFailedAfter time.Duration + bundleDownloadConnTimeout time.Duration + bundleRetryDelay time.Duration + bundleCleanupDelay time.Duration + downloadQueue chan *DownloadRequest + isClosed *int32 + workers []*BundleDownloader +} + +func (bm *bundleManager) initializeBundleDownloading() { + atomic.StoreInt32(bm.isClosed, 0) + bm.workers = make([]*BundleDownloader, bm.concurrentDownloads) + + // create workers + for i := 0; i < bm.concurrentDownloads; i++ { + worker := BundleDownloader{ + id: i + 1, + workChan: make(chan *DownloadRequest), } + bm.workers[i] = &worker + worker.Start() } } -func queueDownloadRequest(dep DataDeployment) { +func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { - retryIn := bundleRetryDelay + retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute - markFailedAt := time.Now().Add(markDeploymentFailedAfter) + markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) req := &DownloadRequest{ dep: dep, bundleFile: getBundleFile(dep), backoffFunc: createBackoff(retryIn, maxBackOff), markFailedAt: markFailedAt, + connTimeout: bm.bundleDownloadConnTimeout, } - downloadQueue <- req + go bm.enqueueRequest(req) +} + +// a blocking method to enqueue download requests +func (bm *bundleManager) enqueueRequest(r *DownloadRequest) { + if atomic.LoadInt32(bm.isClosed) == 1 { + return + } + defer func() { + if r := recover(); r != nil { + log.Warn("trying to enque requests to closed bundleManager") + } + }() + bm.downloadQueue <- r +} + +func (bm *bundleManager) Close() { + atomic.StoreInt32(bm.isClosed, 1) + close(bm.downloadQueue) +} + +func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) { + log.Debugf("will delete %d old bundles", len(deletedDeployments)) + go func() { + // give clients a minute to avoid conflicts + time.Sleep(bm.bundleCleanupDelay) + for _, dep := range deletedDeployments { + bundleFile := getBundleFile(&dep) + log.Debugf("removing old bundle: %v", bundleFile) + // TODO Remove from the Database table edgex_blob_available + safeDelete(bundleFile) + } + }() } type DownloadRequest struct { - dep DataDeployment + dep *DataDeployment bundleFile string backoffFunc func() markFailedAt time.Time + connTimeout time.Duration } -func (r *DownloadRequest) downloadBundle() { +func (r *DownloadRequest) downloadBundle() error { dep := r.dep - log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID) + log.Debugf("starting bundle download attempt for depId=%s: blobId=%s", dep.ID, dep.BlobID) r.checkTimeout() - tempFile, err := downloadFromURI(dep.BlobID) - - if err == nil { - err = os.Rename(tempFile, r.bundleFile) - if err != nil { - log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err) - } - } - - if tempFile != "" { - go safeDelete(tempFile) - } - - if err == nil { - blobId := atomic.AddInt64(&gwBlobId, 1) - blobIds := strconv.FormatInt(blobId, 10) - err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile) - if err != nil { - dep.GWBlobID = blobIds - } - } + tempFile, err := downloadFromURI(dep.BlobID, r.connTimeout) if err != nil { - // add myself back into the queue after back off - go func() { - r.backoffFunc() - downloadQueue <- r - }() - return + log.Errorf("Unable to download blob file blobId=%s: %s", dep.BlobID, err) + return err } - log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID) + defer func() { + if tempFile != "" { + go safeDelete(tempFile) + } + }() + + err = os.Rename(tempFile, r.bundleFile) + if err != nil { + log.Errorf("Unable to rename temp blob file %s to %s: %s", tempFile, r.bundleFile, err) + return err + } + + blobId := atomic.AddInt64(&gwBlobId, 1) + blobIds := strconv.FormatInt(blobId, 10) + err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile) + if err != nil { + return err + } + dep.GWBlobID = blobIds + + log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID) // send deployments to client deploymentsChanged <- dep.ID + + return nil } func (r *DownloadRequest) checkTimeout() { @@ -128,15 +178,15 @@ } -func getBundleFile(dep DataDeployment) string { +func getBundleFile(dep *DataDeployment) string { return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID))) } -func getSignedURL(blobId string) (string, error) { +func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) { - blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI)) + blobUri, err := url.Parse(blobServerURL) if err != nil { log.Panicf("bad url value for config %s: %s", blobUri, err) } @@ -149,7 +199,7 @@ uri := blobUri.String() - surl, err := getURIReader(uri) + surl, err := getURIReader(uri, bundleDownloadConnTimeout) if err != nil { log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err) return "", err @@ -165,12 +215,12 @@ // downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally // after downloading the resource from GCS (via the signed URL) -func downloadFromURI(blobId string) (tempFileName string, err error) { +func downloadFromURI(blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) { var tempFile *os.File log.Debugf("Downloading bundle: %s", blobId) - uri, err := getSignedURL(blobId) + uri, err := getSignedURL(blobId, bundleDownloadConnTimeout) if err != nil { log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err) return @@ -185,7 +235,7 @@ tempFileName = tempFile.Name() var confReader io.ReadCloser - confReader, err = getURIReader(uri) + confReader, err = getURIReader(uri, bundleDownloadConnTimeout) if err != nil { log.Errorf("Unable to retrieve bundle %s: %v", uri, err) return @@ -203,7 +253,7 @@ } // retrieveBundle retrieves bundle data from a URI -func getURIReader(uriString string) (io.ReadCloser, error) { +func getURIReader(uriString string, bundleDownloadConnTimeout time.Duration) (io.ReadCloser, error) { client := http.Client{ Timeout: bundleDownloadConnTimeout, @@ -218,62 +268,38 @@ return res.Body, nil } -func initializeBundleDownloading() { - - // create workers - for i := 0; i < concurrentDownloads; i++ { - worker := BundleDownloader{ - id: i + 1, - workChan: make(chan *DownloadRequest), - quitChan: make(chan bool), - } - worker.Start() - } - - // run dispatcher - go func() { - for { - select { - case req := <-downloadQueue: - log.Debugf("dispatching downloader for: %s", req.bundleFile) - go func() { - worker := <-workerQueue - log.Debugf("got a worker for: %s", req.bundleFile) - worker <- req - }() - } - } - }() -} - type BundleDownloader struct { id int workChan chan *DownloadRequest - quitChan chan bool + bm *bundleManager } func (w *BundleDownloader) Start() { go func() { log.Debugf("started bundle downloader %d", w.id) - for { - // wait for work - workerQueue <- w.workChan - select { - case req := <-w.workChan: - log.Debugf("starting download %s", req.bundleFile) - req.downloadBundle() - - case <-w.quitChan: - log.Debugf("bundle downloader %d stopped", w.id) - return + for req := range w.bm.downloadQueue { + log.Debugf("starting download %s", req.bundleFile) + err := req.downloadBundle() + if err != nil { + go func() { + req.backoffFunc() + w.bm.enqueueRequest(req) + }() } } + log.Debugf("bundle downloader %d stopped", w.id) }() } -func (w *BundleDownloader) Stop() { - go func() { - w.quitChan <- true - }() +// simple doubling back-off +func createBackoff(retryIn, maxBackOff time.Duration) func() { + return func() { + log.Debugf("backoff called. will retry in %s.", retryIn) + time.Sleep(retryIn) + retryIn = retryIn * time.Duration(2) + if retryIn > maxBackOff { + retryIn = maxBackOff + } + } }
diff --git a/data.go b/data.go index 3ec5eca..0be62ca 100644 --- a/data.go +++ b/data.go
@@ -21,7 +21,7 @@ ) var ( - dbMan dbManagerInterface + dbMan dbManagerInterface gwBlobId int64 ) @@ -47,7 +47,6 @@ Exec(query string, args ...interface{}) (sql.Result, error) } - type dbManagerInterface interface { setDbVersion(string) initDb() error @@ -57,11 +56,10 @@ getLocalFSLocation(string) (string, error) } - type dbManager struct { - data apid.DataService - db apid.DB - dbMux sync.RWMutex + data apid.DataService + db apid.DB + dbMux sync.RWMutex } func (dbc *dbManager) setDbVersion(version string) { @@ -80,7 +78,7 @@ return dbc.db } -func (dbc *dbManager) initDb() error{ +func (dbc *dbManager) initDb() error { _, err := dbc.getDb().Exec(` CREATE TABLE IF NOT EXISTS edgex_blob_available ( gwblobid integer primary key, @@ -100,7 +98,6 @@ // getUnreadyDeployments() returns array of resources that are not yet to be processed func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) { - rows, err := dbc.getDb().Query(` SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id FROM project_runtime_blob_metadata @@ -133,7 +130,6 @@ // getDeployments() func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) { - rows, err := dbc.getDb().Query(` SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id, a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
diff --git a/init.go b/init.go index 7c9b530..552ad3a 100644 --- a/init.go +++ b/init.go
@@ -40,18 +40,15 @@ ) var ( - services apid.Services - log apid.LogService - config apid.ConfigService - bundlePath string - debounceDuration time.Duration - bundleCleanupDelay time.Duration - apiServerBaseURI *url.URL - blobServerURL string - apidInstanceID string - apidClusterID string - downloadQueueSize int - concurrentDownloads int + services apid.Services + log apid.LogService + config apid.ConfigService + bundlePath string + debounceDuration time.Duration + apiServerBaseURI *url.URL + blobServerURL string + apidInstanceID string + apidClusterID string ) func init() { @@ -102,29 +99,39 @@ return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration) } - bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay) + bundleCleanupDelay := config.GetDuration(configBundleCleanupDelay) if bundleCleanupDelay < time.Millisecond { return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay) } - markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter) + markDeploymentFailedAfter := config.GetDuration(configMarkDeployFailedAfter) if markDeploymentFailedAfter < time.Millisecond { return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter) } - bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout) + bundleDownloadConnTimeout := config.GetDuration(configDownloadConnTimeout) if bundleDownloadConnTimeout < time.Millisecond { return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout) } + concurrentDownloads := config.GetInt(configConcurrentDownloads) + downloadQueueSize := config.GetInt(configDownloadQueueSize) + bundleMan = &bundleManager{ + concurrentDownloads: concurrentDownloads, + markDeploymentFailedAfter: markDeploymentFailedAfter, + bundleDownloadConnTimeout: bundleDownloadConnTimeout, + bundleRetryDelay: time.Second, + bundleCleanupDelay: bundleCleanupDelay, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), + } + dbMan = &dbManager{ - data: services.Data(), + data: services.Data(), dbMux: sync.RWMutex{}, } blobServerURL = config.GetString(configBlobServerBaseURI) - concurrentDownloads = config.GetInt(configConcurrentDownloads) - downloadQueueSize = config.GetInt(configDownloadQueueSize) relativeBundlePath := config.GetString(configBundleDirKey) storagePath := config.GetString("local_storage_path") bundlePath = path.Join(storagePath, relativeBundlePath) @@ -133,7 +140,7 @@ } log.Infof("Bundle directory path is %s", bundlePath) - initializeBundleDownloading() + bundleMan.initializeBundleDownloading() go distributeEvents()
diff --git a/listener.go b/listener.go index d30aabb..77c30a5 100644 --- a/listener.go +++ b/listener.go
@@ -15,7 +15,6 @@ import ( "os" - "time" "database/sql" "github.com/30x/apid-core" @@ -81,7 +80,7 @@ } log.Debugf("Queuing %d deployments for bundle download", len(deployments)) for _, dep := range deployments { - queueDownloadRequest(dep) + go bundleMan.queueDownloadRequest(&dep) } }() } @@ -117,22 +116,13 @@ } for _, dep := range insertedDeployments { - queueDownloadRequest(dep) + go bundleMan.queueDownloadRequest(&dep) } // clean up old bundles if len(deletedDeployments) > 0 { log.Debugf("will delete %d old bundles", len(deletedDeployments)) - go func() { - // give clients a minute to avoid conflicts - time.Sleep(bundleCleanupDelay) - for _, dep := range deletedDeployments { - bundleFile := getBundleFile(dep) - log.Debugf("removing old bundle: %v", bundleFile) - // TODO Remove from the Database table edgex_blob_available - safeDelete(bundleFile) - } - }() + bundleMan.deleteBundles(deletedDeployments) } }