reformat bundle.go to make it unit-testable
diff --git a/api.go b/api.go
index 5054d98..5aea7f2 100644
--- a/api.go
+++ b/api.go
@@ -293,7 +293,7 @@
for _, d := range dataDeps {
apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
Self: apiDeps.Self + "/" + d.ID,
- Name: d.Name,
+ Name: d.Name,
Type: d.Type,
Org: d.OrgID,
Env: d.EnvID,
@@ -332,7 +332,7 @@
}
// TODO
-func getDeploymentScope() string{
+func getDeploymentScope() string {
return ""
}
@@ -350,4 +350,3 @@
log.Error("convertTime: Unsupported time format: " + t)
return t
}
-
diff --git a/api_test.go b/api_test.go
index fe8bef6..150601d 100644
--- a/api_test.go
+++ b/api_test.go
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
+
import (
"encoding/json"
"io/ioutil"
@@ -22,14 +23,12 @@
. "github.com/onsi/gomega"
)
-
-
-
-
var _ = Describe("api", func() {
Context("GET /deployments", func() {
It("should get empty set if no deployments", func() {
+ //only called once
+ InitAPI()
var uri url.URL
uri = *apiServerBaseURI
@@ -50,188 +49,32 @@
//Expect(len(depRes)).To(Equal(0))
//Expect(string(body)).Should(Equal("[]"))
})
- /*
- It("should debounce requests", func(done Done) {
- var in = make(chan interface{})
- var out = make(chan []interface{})
-
- go debounce(in, out, 3*time.Millisecond)
-
- go func() {
- defer GinkgoRecover()
-
- received, ok := <-out
- Expect(ok).To(BeTrue())
- Expect(len(received)).To(Equal(2))
-
- close(in)
- received, ok = <-out
- Expect(ok).To(BeFalse())
-
- close(done)
- }()
-
- in <- "x"
- in <- "y"
- })
-
- It("should get current deployments", func() {
-
- deploymentID := "api_get_current"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(1))
-
- dep := depRes[0]
-
- Expect(dep.ID).To(Equal(deploymentID))
- Expect(dep.ScopeId).To(Equal(deploymentID))
- Expect(dep.DisplayName).To(Equal(deploymentID))
-
- var config bundleConfigJson
-
- err = json.Unmarshal(dep.ConfigJson, &config)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(config.Name).To(Equal("/bundles/1"))
-
- err = json.Unmarshal(dep.BundleConfigJson, &config)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(config.Name).To(Equal("/bundles/1"))
- })
-
- It("should get 304 for no change", func() {
-
- deploymentID := "api_no_change"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
- res, err = http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusNotModified))
- })
-
- It("should get empty set after blocking if no deployments", func() {
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
- Expect(string(body)).Should(Equal("[]"))
- })
-
- It("should get new deployment set after blocking", func(done Done) {
-
- deploymentID := "api_get_current_blocking"
- insertTestDeployment(testServer, deploymentID)
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- eTag := res.Header.Get("etag")
- Expect(eTag).ShouldNot(BeEmpty())
-
- deploymentID = "api_get_current_blocking2"
- go func() {
- defer GinkgoRecover()
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", eTag)
-
- res, err := http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusOK))
-
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
- Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(2))
-
- dep := depRes[1]
-
- Expect(dep.ID).To(Equal(deploymentID))
- Expect(dep.ScopeId).To(Equal(deploymentID))
- Expect(dep.DisplayName).To(Equal(deploymentID))
-
- close(done)
- }()
-
- time.Sleep(250 * time.Millisecond) // give api call above time to block
- insertTestDeployment(testServer, deploymentID)
- deploymentsChanged <- deploymentID
- })
-
- It("should get 304 after blocking if no new deployment", func() {
-
- deploymentID := "api_no_change_blocking"
- insertTestDeployment(testServer, deploymentID)
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
-
- query := uri.Query()
- query.Add("block", "1")
- uri.RawQuery = query.Encode()
- req, err := http.NewRequest("GET", uri.String(), nil)
- req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", res.Header.Get("etag"))
-
- res, err = http.DefaultClient.Do(req)
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
- Expect(res.StatusCode).To(Equal(http.StatusNotModified))
- })
- */
})
-})
\ No newline at end of file
+})
+
+type dummyDbMan struct {
+}
+
+func (d *dummyDbMan) setDbVersion(version string) {
+
+}
+
+func (d *dummyDbMan) initDb() error {
+ return nil
+}
+
+func (d *dummyDbMan) getUnreadyDeployments() ([]DataDeployment, error) {
+ return nil, nil
+}
+
+func (d *dummyDbMan) getReadyDeployments() ([]DataDeployment, error) {
+ return nil, nil
+}
+
+func (d *dummyDbMan) updateLocalFsLocation(string, string, string) error {
+ return nil
+}
+
+func (d *dummyDbMan) getLocalFSLocation(string) (string, error) {
+ return "", nil
+}
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go
index acd4eb9..165c3a5 100644
--- a/apidGatewayConfDeploy_suite_test.go
+++ b/apidGatewayConfDeploy_suite_test.go
@@ -1,18 +1,18 @@
package apiGatewayConfDeploy
import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
"github.com/30x/apid-core"
"github.com/30x/apid-core/factory"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
"io/ioutil"
- "time"
"os"
+ "testing"
+ "time"
)
-
var (
- tmpDir string
+ tmpDir string
)
var _ = BeforeSuite(func() {
@@ -27,18 +27,16 @@
config.Set(configApidClusterID, "CLUSTER_ID")
config.Set(configApiServerBaseURI, "http://localhost")
config.Set(configDebounceDuration, "1ms")
- apid.InitializePlugins("")
-
-
- bundleCleanupDelay = time.Millisecond
- bundleRetryDelay = 10 * time.Millisecond
- markDeploymentFailedAfter = 50 * time.Millisecond
- concurrentDownloads = 1
- downloadQueueSize = 1
-
+ config.Set(configDownloadQueueSize, 1)
+ config.Set(configBundleCleanupDelay, time.Millisecond)
})
var _ = AfterSuite(func() {
apid.Events().Close()
os.RemoveAll(tmpDir)
-})
\ No newline at end of file
+})
+
+func TestApidGatewayDeploy(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "ApidGatewayConfDeploy Suite")
+}
diff --git a/bundle.go b/bundle.go
index ee36cbb..86fa7c8 100644
--- a/bundle.go
+++ b/bundle.go
@@ -32,88 +32,138 @@
)
var (
- markDeploymentFailedAfter time.Duration
- bundleDownloadConnTimeout time.Duration
- bundleRetryDelay = time.Second
- downloadQueue = make(chan *DownloadRequest, downloadQueueSize)
- workerQueue = make(chan chan *DownloadRequest, concurrentDownloads)
+ bundleMan bundleManagerInterface
)
-// simple doubling back-off
-func createBackoff(retryIn, maxBackOff time.Duration) func() {
- return func() {
- log.Debugf("backoff called. will retry in %s.", retryIn)
- time.Sleep(retryIn)
- retryIn = retryIn * time.Duration(2)
- if retryIn > maxBackOff {
- retryIn = maxBackOff
+type bundleManagerInterface interface {
+ initializeBundleDownloading()
+ queueDownloadRequest(*DataDeployment)
+ enqueueRequest(*DownloadRequest)
+ deleteBundles([]DataDeployment)
+ Close()
+}
+
+type bundleManager struct {
+ concurrentDownloads int
+ markDeploymentFailedAfter time.Duration
+ bundleDownloadConnTimeout time.Duration
+ bundleRetryDelay time.Duration
+ bundleCleanupDelay time.Duration
+ downloadQueue chan *DownloadRequest
+ isClosed *int32
+ workers []*BundleDownloader
+}
+
+func (bm *bundleManager) initializeBundleDownloading() {
+ atomic.StoreInt32(bm.isClosed, 0)
+ bm.workers = make([]*BundleDownloader, bm.concurrentDownloads)
+
+ // create workers
+ for i := 0; i < bm.concurrentDownloads; i++ {
+ worker := BundleDownloader{
+ id: i + 1,
+ workChan: make(chan *DownloadRequest),
}
+ bm.workers[i] = &worker
+ worker.Start()
}
}
-func queueDownloadRequest(dep DataDeployment) {
+func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) {
- retryIn := bundleRetryDelay
+ retryIn := bm.bundleRetryDelay
maxBackOff := 5 * time.Minute
- markFailedAt := time.Now().Add(markDeploymentFailedAfter)
+ markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
req := &DownloadRequest{
dep: dep,
bundleFile: getBundleFile(dep),
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
+ connTimeout: bm.bundleDownloadConnTimeout,
}
- downloadQueue <- req
+ go bm.enqueueRequest(req)
+}
+
+// a blocking method to enqueue download requests
+func (bm *bundleManager) enqueueRequest(r *DownloadRequest) {
+ if atomic.LoadInt32(bm.isClosed) == 1 {
+ return
+ }
+ defer func() {
+ if r := recover(); r != nil {
+ log.Warn("trying to enque requests to closed bundleManager")
+ }
+ }()
+ bm.downloadQueue <- r
+}
+
+func (bm *bundleManager) Close() {
+ atomic.StoreInt32(bm.isClosed, 1)
+ close(bm.downloadQueue)
+}
+
+func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) {
+ log.Debugf("will delete %d old bundles", len(deletedDeployments))
+ go func() {
+ // give clients a minute to avoid conflicts
+ time.Sleep(bm.bundleCleanupDelay)
+ for _, dep := range deletedDeployments {
+ bundleFile := getBundleFile(&dep)
+ log.Debugf("removing old bundle: %v", bundleFile)
+ // TODO Remove from the Database table edgex_blob_available
+ safeDelete(bundleFile)
+ }
+ }()
}
type DownloadRequest struct {
- dep DataDeployment
+ dep *DataDeployment
bundleFile string
backoffFunc func()
markFailedAt time.Time
+ connTimeout time.Duration
}
-func (r *DownloadRequest) downloadBundle() {
+func (r *DownloadRequest) downloadBundle() error {
dep := r.dep
- log.Debugf("starting bundle download attempt for %s: %s", dep.ID, dep.BlobID)
+ log.Debugf("starting bundle download attempt for depId=%s: blobId=%s", dep.ID, dep.BlobID)
r.checkTimeout()
- tempFile, err := downloadFromURI(dep.BlobID)
-
- if err == nil {
- err = os.Rename(tempFile, r.bundleFile)
- if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, r.bundleFile, err)
- }
- }
-
- if tempFile != "" {
- go safeDelete(tempFile)
- }
-
- if err == nil {
- blobId := atomic.AddInt64(&gwBlobId, 1)
- blobIds := strconv.FormatInt(blobId, 10)
- err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
- if err != nil {
- dep.GWBlobID = blobIds
- }
- }
+ tempFile, err := downloadFromURI(dep.BlobID, r.connTimeout)
if err != nil {
- // add myself back into the queue after back off
- go func() {
- r.backoffFunc()
- downloadQueue <- r
- }()
- return
+ log.Errorf("Unable to download blob file blobId=%s: %s", dep.BlobID, err)
+ return err
}
- log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BlobID)
+ defer func() {
+ if tempFile != "" {
+ go safeDelete(tempFile)
+ }
+ }()
+
+ err = os.Rename(tempFile, r.bundleFile)
+ if err != nil {
+ log.Errorf("Unable to rename temp blob file %s to %s: %s", tempFile, r.bundleFile, err)
+ return err
+ }
+
+ blobId := atomic.AddInt64(&gwBlobId, 1)
+ blobIds := strconv.FormatInt(blobId, 10)
+ err = dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
+ if err != nil {
+ return err
+ }
+ dep.GWBlobID = blobIds
+
+ log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID)
// send deployments to client
deploymentsChanged <- dep.ID
+
+ return nil
}
func (r *DownloadRequest) checkTimeout() {
@@ -128,15 +178,15 @@
}
-func getBundleFile(dep DataDeployment) string {
+func getBundleFile(dep *DataDeployment) string {
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID)))
}
-func getSignedURL(blobId string) (string, error) {
+func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
- blobUri, err := url.Parse(config.GetString(configBlobServerBaseURI))
+ blobUri, err := url.Parse(blobServerURL)
if err != nil {
log.Panicf("bad url value for config %s: %s", blobUri, err)
}
@@ -149,7 +199,7 @@
uri := blobUri.String()
- surl, err := getURIReader(uri)
+ surl, err := getURIReader(uri, bundleDownloadConnTimeout)
if err != nil {
log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
return "", err
@@ -165,12 +215,12 @@
// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
// after downloading the resource from GCS (via the signed URL)
-func downloadFromURI(blobId string) (tempFileName string, err error) {
+func downloadFromURI(blobId string, bundleDownloadConnTimeout time.Duration) (tempFileName string, err error) {
var tempFile *os.File
log.Debugf("Downloading bundle: %s", blobId)
- uri, err := getSignedURL(blobId)
+ uri, err := getSignedURL(blobId, bundleDownloadConnTimeout)
if err != nil {
log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
return
@@ -185,7 +235,7 @@
tempFileName = tempFile.Name()
var confReader io.ReadCloser
- confReader, err = getURIReader(uri)
+ confReader, err = getURIReader(uri, bundleDownloadConnTimeout)
if err != nil {
log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
return
@@ -203,7 +253,7 @@
}
// retrieveBundle retrieves bundle data from a URI
-func getURIReader(uriString string) (io.ReadCloser, error) {
+func getURIReader(uriString string, bundleDownloadConnTimeout time.Duration) (io.ReadCloser, error) {
client := http.Client{
Timeout: bundleDownloadConnTimeout,
@@ -218,62 +268,38 @@
return res.Body, nil
}
-func initializeBundleDownloading() {
-
- // create workers
- for i := 0; i < concurrentDownloads; i++ {
- worker := BundleDownloader{
- id: i + 1,
- workChan: make(chan *DownloadRequest),
- quitChan: make(chan bool),
- }
- worker.Start()
- }
-
- // run dispatcher
- go func() {
- for {
- select {
- case req := <-downloadQueue:
- log.Debugf("dispatching downloader for: %s", req.bundleFile)
- go func() {
- worker := <-workerQueue
- log.Debugf("got a worker for: %s", req.bundleFile)
- worker <- req
- }()
- }
- }
- }()
-}
-
type BundleDownloader struct {
id int
workChan chan *DownloadRequest
- quitChan chan bool
+ bm *bundleManager
}
func (w *BundleDownloader) Start() {
go func() {
log.Debugf("started bundle downloader %d", w.id)
- for {
- // wait for work
- workerQueue <- w.workChan
- select {
- case req := <-w.workChan:
- log.Debugf("starting download %s", req.bundleFile)
- req.downloadBundle()
-
- case <-w.quitChan:
- log.Debugf("bundle downloader %d stopped", w.id)
- return
+ for req := range w.bm.downloadQueue {
+ log.Debugf("starting download %s", req.bundleFile)
+ err := req.downloadBundle()
+ if err != nil {
+ go func() {
+ req.backoffFunc()
+ w.bm.enqueueRequest(req)
+ }()
}
}
+ log.Debugf("bundle downloader %d stopped", w.id)
}()
}
-func (w *BundleDownloader) Stop() {
- go func() {
- w.quitChan <- true
- }()
+// simple doubling back-off
+func createBackoff(retryIn, maxBackOff time.Duration) func() {
+ return func() {
+ log.Debugf("backoff called. will retry in %s.", retryIn)
+ time.Sleep(retryIn)
+ retryIn = retryIn * time.Duration(2)
+ if retryIn > maxBackOff {
+ retryIn = maxBackOff
+ }
+ }
}
diff --git a/data.go b/data.go
index 3ec5eca..0be62ca 100644
--- a/data.go
+++ b/data.go
@@ -21,7 +21,7 @@
)
var (
- dbMan dbManagerInterface
+ dbMan dbManagerInterface
gwBlobId int64
)
@@ -47,7 +47,6 @@
Exec(query string, args ...interface{}) (sql.Result, error)
}
-
type dbManagerInterface interface {
setDbVersion(string)
initDb() error
@@ -57,11 +56,10 @@
getLocalFSLocation(string) (string, error)
}
-
type dbManager struct {
- data apid.DataService
- db apid.DB
- dbMux sync.RWMutex
+ data apid.DataService
+ db apid.DB
+ dbMux sync.RWMutex
}
func (dbc *dbManager) setDbVersion(version string) {
@@ -80,7 +78,7 @@
return dbc.db
}
-func (dbc *dbManager) initDb() error{
+func (dbc *dbManager) initDb() error {
_, err := dbc.getDb().Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
gwblobid integer primary key,
@@ -100,7 +98,6 @@
// getUnreadyDeployments() returns array of resources that are not yet to be processed
func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) {
-
rows, err := dbc.getDb().Query(`
SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
FROM project_runtime_blob_metadata
@@ -133,7 +130,6 @@
// getDeployments()
func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) {
-
rows, err := dbc.getDb().Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
diff --git a/init.go b/init.go
index 7c9b530..552ad3a 100644
--- a/init.go
+++ b/init.go
@@ -40,18 +40,15 @@
)
var (
- services apid.Services
- log apid.LogService
- config apid.ConfigService
- bundlePath string
- debounceDuration time.Duration
- bundleCleanupDelay time.Duration
- apiServerBaseURI *url.URL
- blobServerURL string
- apidInstanceID string
- apidClusterID string
- downloadQueueSize int
- concurrentDownloads int
+ services apid.Services
+ log apid.LogService
+ config apid.ConfigService
+ bundlePath string
+ debounceDuration time.Duration
+ apiServerBaseURI *url.URL
+ blobServerURL string
+ apidInstanceID string
+ apidClusterID string
)
func init() {
@@ -102,29 +99,39 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
}
- bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay)
+ bundleCleanupDelay := config.GetDuration(configBundleCleanupDelay)
if bundleCleanupDelay < time.Millisecond {
return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
}
- markDeploymentFailedAfter = config.GetDuration(configMarkDeployFailedAfter)
+ markDeploymentFailedAfter := config.GetDuration(configMarkDeployFailedAfter)
if markDeploymentFailedAfter < time.Millisecond {
return pluginData, fmt.Errorf("%s must be a positive duration", configMarkDeployFailedAfter)
}
- bundleDownloadConnTimeout = config.GetDuration(configDownloadConnTimeout)
+ bundleDownloadConnTimeout := config.GetDuration(configDownloadConnTimeout)
if bundleDownloadConnTimeout < time.Millisecond {
return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
}
+ concurrentDownloads := config.GetInt(configConcurrentDownloads)
+ downloadQueueSize := config.GetInt(configDownloadQueueSize)
+ bundleMan = &bundleManager{
+ concurrentDownloads: concurrentDownloads,
+ markDeploymentFailedAfter: markDeploymentFailedAfter,
+ bundleDownloadConnTimeout: bundleDownloadConnTimeout,
+ bundleRetryDelay: time.Second,
+ bundleCleanupDelay: bundleCleanupDelay,
+ downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
+ isClosed: new(int32),
+ }
+
dbMan = &dbManager{
- data: services.Data(),
+ data: services.Data(),
dbMux: sync.RWMutex{},
}
blobServerURL = config.GetString(configBlobServerBaseURI)
- concurrentDownloads = config.GetInt(configConcurrentDownloads)
- downloadQueueSize = config.GetInt(configDownloadQueueSize)
relativeBundlePath := config.GetString(configBundleDirKey)
storagePath := config.GetString("local_storage_path")
bundlePath = path.Join(storagePath, relativeBundlePath)
@@ -133,7 +140,7 @@
}
log.Infof("Bundle directory path is %s", bundlePath)
- initializeBundleDownloading()
+ bundleMan.initializeBundleDownloading()
go distributeEvents()
diff --git a/listener.go b/listener.go
index d30aabb..77c30a5 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
import (
"os"
- "time"
"database/sql"
"github.com/30x/apid-core"
@@ -81,7 +80,7 @@
}
log.Debugf("Queuing %d deployments for bundle download", len(deployments))
for _, dep := range deployments {
- queueDownloadRequest(dep)
+ go bundleMan.queueDownloadRequest(&dep)
}
}()
}
@@ -117,22 +116,13 @@
}
for _, dep := range insertedDeployments {
- queueDownloadRequest(dep)
+ go bundleMan.queueDownloadRequest(&dep)
}
// clean up old bundles
if len(deletedDeployments) > 0 {
log.Debugf("will delete %d old bundles", len(deletedDeployments))
- go func() {
- // give clients a minute to avoid conflicts
- time.Sleep(bundleCleanupDelay)
- for _, dep := range deletedDeployments {
- bundleFile := getBundleFile(dep)
- log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table edgex_blob_available
- safeDelete(bundleFile)
- }
- }()
+ bundleMan.deleteBundles(deletedDeployments)
}
}