add getUnreadyBlobs() and support for starting from local sqlite
diff --git a/api_test.go b/api_test.go index d8d0668..f7bdbc8 100644 --- a/api_test.go +++ b/api_test.go
@@ -291,7 +291,6 @@ } dummyDbMan.readyDeployments = deployments - dummyDbMan.unreadyDeployments = deployments return details } @@ -334,9 +333,9 @@ } type dummyDbManager struct { - unreadyDeployments []DataDeployment - readyDeployments []DataDeployment - localFSLocation string + unreadyBlobIds []string + readyDeployments []DataDeployment + localFSLocation string } func (d *dummyDbManager) setDbVersion(version string) { @@ -347,8 +346,8 @@ return nil } -func (d *dummyDbManager) getUnreadyDeployments() ([]DataDeployment, error) { - return d.unreadyDeployments, nil +func (d *dummyDbManager) getUnreadyBlobs() ([]string, error) { + return d.unreadyBlobIds, nil } func (d *dummyDbManager) getReadyDeployments() ([]DataDeployment, error) {
diff --git a/bundle.go b/bundle.go index 5be0761..10e1632 100644 --- a/bundle.go +++ b/bundle.go
@@ -34,6 +34,7 @@ initializeBundleDownloading() queueDownloadRequest(*DataDeployment) enqueueRequest(*DownloadRequest) + makeDownloadRequest(string) *DownloadRequest //deleteBundles([]DataDeployment) Close() } @@ -70,26 +71,8 @@ // download bundle blob and resource blob // TODO do not download duplicate blobs func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { - - retryIn := bm.bundleRetryDelay - maxBackOff := 5 * time.Minute - markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) - - blobReq := &DownloadRequest{ - bm: bm, - blobId: dep.BlobID, - backoffFunc: createBackoff(retryIn, maxBackOff), - markFailedAt: markFailedAt, - connTimeout: bm.bundleDownloadConnTimeout, - } - - resourceReq := &DownloadRequest{ - bm: bm, - blobId: dep.BlobID, - backoffFunc: createBackoff(retryIn, maxBackOff), - markFailedAt: markFailedAt, - connTimeout: bm.bundleDownloadConnTimeout, - } + blobReq := bm.makeDownloadRequest(dep.BlobID) + resourceReq := bm.makeDownloadRequest(dep.BlobResourceID) go func() { bm.enqueueRequest(blobReq) @@ -97,6 +80,20 @@ }() } +func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest { + markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) + retryIn := bm.bundleRetryDelay + maxBackOff := 5 * time.Minute + + return &DownloadRequest{ + bm: bm, + blobId: id, + backoffFunc: createBackoff(retryIn, maxBackOff), + markFailedAt: markFailedAt, + connTimeout: bm.bundleDownloadConnTimeout, + } +} + // a blocking method to enqueue download requests func (bm *bundleManager) enqueueRequest(r *DownloadRequest) { if atomic.LoadInt32(bm.isClosed) == 1 {
diff --git a/data.go b/data.go index f443d9b..0413232 100644 --- a/data.go +++ b/data.go
@@ -48,7 +48,7 @@ type dbManagerInterface interface { setDbVersion(string) initDb() error - getUnreadyDeployments() ([]DataDeployment, error) + getUnreadyBlobs() ([]string, error) getReadyDeployments() ([]DataDeployment, error) updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) @@ -93,35 +93,48 @@ // getUnreadyDeployments() returns array of resources that are not yet to be processed // TODO make it work with new schema -func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) { +func (dbc *dbManager) getUnreadyBlobs() (ids []string, err error) { + // get unready blob id rows, err := dbc.getDb().Query(` - SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id - FROM project_runtime_blob_metadata - LEFT JOIN edgex_blob_available - ON project_runtime_blob_metadata.id = edgex_blob_available.runtime_meta_id - WHERE edgex_blob_available.runtime_meta_id IS NULL; + SELECT a.bean_blob_id + FROM metadata_runtime_entity_metadata as a + LEFT JOIN edgex_blob_available as b + ON a.bean_blob_id = b.id + WHERE b.id IS NULL; `) - if err != nil { log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) return } defer rows.Close() - for rows.Next() { - dep := DataDeployment{} - rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID, - &dep.BlobResourceID) - deployments = append(deployments, dep) - log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID) + var id string + rows.Scan(&id) + ids = append(ids, id) } - if len(deployments) == 0 { - log.Debug("No new resources found to be processed") - err = sql.ErrNoRows - } - return + // get unready resource id + rows, err = dbc.getDb().Query(` + SELECT a.resource_blob_id + FROM metadata_runtime_entity_metadata as a + LEFT JOIN edgex_blob_available as b + ON a.bean_blob_id = b.id + WHERE b.id IS NULL; + `) + if err != nil { + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return + } + defer rows.Close() + for rows.Next() { + var id string + rows.Scan(&id) + ids = append(ids, id) + } + + log.Debugf("Unready blobId %v", ids) + return } // getDeployments()
diff --git a/listener.go b/listener.go index 54671e9..3c08116 100644 --- a/listener.go +++ b/listener.go
@@ -16,7 +16,6 @@ import ( "os" - "database/sql" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) @@ -85,15 +84,15 @@ go func() { // create edgex_blob_available table h.dbMan.initDb() + blobIds, err := h.dbMan.getUnreadyBlobs() - deployments, err := h.dbMan.getUnreadyDeployments() - - if err != nil && err != sql.ErrNoRows { + if err != nil { log.Panicf("unable to query database for unready deployments: %v", err) } - log.Debugf("Queuing %d deployments for bundle download", len(deployments)) - for _, dep := range deployments { - go h.bundleMan.queueDownloadRequest(&dep) + + log.Debugf("Queuing %d blob downloads", len(blobIds)) + for _, id := range blobIds { + go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id)) } }() }