update db schema, update bundle.go logics, format apis
diff --git a/api.go b/api.go index e757122..79ed8d2 100644 --- a/api.go +++ b/api.go
@@ -20,6 +20,7 @@ "io" "io/ioutil" "net/http" + "net/url" "strconv" "sync/atomic" "time" @@ -63,18 +64,17 @@ } type ApiDeploymentDetails struct { - Self string `json:"self"` - Name string `json:"name"` - Type string `json:"type"` - Org string `json:"organization"` - Env string `json:"environment"` - Scope string `json:"scope"` - Revision string `json:"revision"` - BlobId string `json:"blobId"` - BlobURL string `json:"bloburl"` - ResourceBlobId string `json:"resourceBlobId"` - Created string `json:"created"` - Updated string `json:"updated"` + Self string `json:"self"` + Name string `json:"name"` + Type string `json:"type"` + Revision string `json:"revision"` + BeanBlobUrl string `json:"beanBlob"` + Org string `json:"orgId"` + Env string `json:"envId"` + ResourceBlobUrl string `json:"resourceBlob"` + Path string `json:"path"` + Created string `json:"created"` + Updated string `json:"updated"` } type ApiDeploymentResponse struct { @@ -83,14 +83,16 @@ ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"` } -const deploymentsEndpoint = "/configurations" -const blobEndpointPath = "/blob" -const blobEndpoint = blobEndpointPath + "/{blobId}" +const ( + deploymentsEndpoint = "/configurations" + blobEndpointPath = "/blob" + blobEndpoint = blobEndpointPath + "/{blobId}" +) type apiManagerInterface interface { InitAPI() addChangedDeployment(string) - distributeEvents() + //distributeEvents() } type apiManager struct { @@ -159,6 +161,8 @@ } } +//TODO get notified when deployments ready +/* func (a *apiManager) distributeEvents() { subscribers := make(map[chan deploymentsResult]bool) deliverDeployments := make(chan []interface{}, 1) @@ -191,6 +195,7 @@ } } } +*/ func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) { @@ -255,8 +260,9 @@ // otherwise, subscribe to any new deployment changes var newDeploymentsChannel chan deploymentsResult if timeout > 0 && ifNoneMatch != "" { - newDeploymentsChannel = make(chan deploymentsResult, 1) - a.addSubscriber <- newDeploymentsChannel + //TODO handle block + //newDeploymentsChannel = make(chan deploymentsResult, 1) + //a.addSubscriber <- newDeploymentsChannel } log.Debug("Blocking request... Waiting for new Deployments.") @@ -300,18 +306,17 @@ for _, d := range dataDeps { apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ - Self: apiDeps.Self + "/" + d.ID, - Name: d.Name, - Type: d.Type, - Org: d.OrgID, - Env: d.EnvID, - Scope: a.getDeploymentScope(), - Revision: d.Revision, - BlobId: d.BlobID, - BlobURL: d.BlobURL, - ResourceBlobId: d.BlobResourceID, - Created: convertTime(d.Created), - Updated: convertTime(d.Updated), + Self: apiDeps.Self + "/" + d.ID, + Name: d.Name, + Type: d.Type, + Revision: d.Revision, + BeanBlobUrl: a.getBlobUrl(d.BlobID), + Org: d.OrgID, + Env: d.EnvID, + ResourceBlobUrl: a.getBlobUrl(d.BlobResourceID), + Path: d.Path, + Created: convertTime(d.Created), + Updated: convertTime(d.Updated), }) } apiDeps.ApiDeploymentsResponse = apiDepDetails @@ -339,9 +344,12 @@ return strconv.FormatInt(e, 10) } -// TODO -func (a *apiManager) getDeploymentScope() string { - return "" +// escape the blobId into url +func (a *apiManager) getBlobUrl(blobId string) string { + if blobId == "" { + return "" + } + return getHttpHost() + "/" + url.PathEscape(blobId) } func convertTime(t string) string { @@ -361,10 +369,10 @@ func getHttpHost() string { // apid-core has to set this according to the protocol apid is to be run: http/https - proto := config.GetString("protocol_type") + proto := config.GetString(configProtocol) if proto == "" { proto = "http" } - proto = proto + "://" + config.GetString("api_listen") + proto = proto + "://" + config.GetString(configAPIListen) return proto }
diff --git a/api_test.go b/api_test.go index 38cef04..4acd20a 100644 --- a/api_test.go +++ b/api_test.go
@@ -29,7 +29,8 @@ ) const ( - testUrl = "http://127.0.0.1:9000" + testUrl = "http://127.0.0.1:9000" + testBlobId = "gcs:SHA-512:39ca7ae89bb9468af34df8bc873748b4035210c91bcc01359c092c1d51364b5f3df06bc69a40621acfaa46791af9ea41bc0f3429a84738ba1a7c8d394859601a" ) var _ = Describe("api", func() { @@ -269,36 +270,34 @@ ID: GenerateUUID(), OrgID: GenerateUUID(), EnvID: GenerateUUID(), + BlobID: testBlobId, + BlobResourceID: "", Type: "virtual-host", Name: "vh-secure", Revision: "1", - BlobID: GenerateUUID(), - GWBlobID: GenerateUUID(), - BlobResourceID: GenerateUUID(), - Updated: time.Now().Format(time.RFC3339), - UpdatedBy: "haoming@google.com", + Path: "/organizations/Org1/", Created: time.Now().Format(time.RFC3339), CreatedBy: "haoming@google.com", + Updated: time.Now().Format(time.RFC3339), + UpdatedBy: "haoming@google.com", BlobFSLocation: "BlobFSLocation", - BlobURL: "http://localhost:6666/testBlobURL", } return dep } func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails { detail := &ApiDeploymentDetails{ - Self: self + "/" + dep.ID, - Name: dep.Name, - Type: dep.Type, - Org: dep.OrgID, - Env: dep.EnvID, - Scope: "", - Revision: dep.Revision, - BlobId: dep.BlobID, - BlobURL: dep.BlobURL, - ResourceBlobId: dep.BlobResourceID, - Created: dep.Created, - Updated: dep.Updated, + Self: self + "/" + dep.ID, + Name: dep.Name, + Type: dep.Type, + Revision: dep.Revision, + BeanBlobUrl: getHttpHost() + "/" + testBlobId, + Org: dep.OrgID, + Env: dep.EnvID, + ResourceBlobUrl: "", + Path: dep.Path, + Created: dep.Created, + Updated: dep.Updated, } return detail } @@ -324,7 +323,7 @@ return d.readyDeployments, nil } -func (d *dummyDbManager) updateLocalFsLocation(string, string, string) error { +func (d *dummyDbManager) updateLocalFsLocation(string, string) error { return nil }
diff --git a/bundle.go b/bundle.go index d104722..5be0761 100644 --- a/bundle.go +++ b/bundle.go
@@ -22,7 +22,6 @@ "net/url" "os" "path" - "strconv" "sync/atomic" "time" ) @@ -35,7 +34,7 @@ initializeBundleDownloading() queueDownloadRequest(*DataDeployment) enqueueRequest(*DownloadRequest) - deleteBundles([]DataDeployment) + //deleteBundles([]DataDeployment) Close() } @@ -68,20 +67,34 @@ } } +// download bundle blob and resource blob +// TODO do not download duplicate blobs func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) - req := &DownloadRequest{ + + blobReq := &DownloadRequest{ bm: bm, - dep: dep, - bundleFile: getBundleFile(dep), + blobId: dep.BlobID, backoffFunc: createBackoff(retryIn, maxBackOff), markFailedAt: markFailedAt, connTimeout: bm.bundleDownloadConnTimeout, } - go bm.enqueueRequest(req) + + resourceReq := &DownloadRequest{ + bm: bm, + blobId: dep.BlobID, + backoffFunc: createBackoff(retryIn, maxBackOff), + markFailedAt: markFailedAt, + connTimeout: bm.bundleDownloadConnTimeout, + } + + go func() { + bm.enqueueRequest(blobReq) + bm.enqueueRequest(resourceReq) + }() } // a blocking method to enqueue download requests @@ -89,11 +102,13 @@ if atomic.LoadInt32(bm.isClosed) == 1 { return } - defer func() { - if r := recover(); r != nil { - log.Warn("trying to enque requests to closed bundleManager") - } - }() + /* + defer func() { + if r := recover(); r != nil { + log.Warn("trying to enque requests to closed bundleManager") + } + }() + */ bm.downloadQueue <- r } @@ -102,24 +117,27 @@ close(bm.downloadQueue) } +// TODO add delete support + func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) { - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - go func() { - // give clients a minute to avoid conflicts - time.Sleep(bm.bundleCleanupDelay) - for _, dep := range deletedDeployments { - bundleFile := getBundleFile(&dep) - log.Debugf("removing old bundle: %v", bundleFile) - // TODO Remove from the Database table edgex_blob_available - safeDelete(bundleFile) - } - }() + /* + log.Debugf("will delete %d old bundles", len(deletedDeployments)) + go func() { + // give clients a minute to avoid conflicts + time.Sleep(bm.bundleCleanupDelay) + for _, dep := range deletedDeployments { + bundleFile := getBlobFilePath(dep.BlobID) + log.Debugf("removing old bundle: %v", bundleFile) + // TODO Remove from the Database table edgex_blob_available + safeDelete(bundleFile) + } + }() + */ } type DownloadRequest struct { bm *bundleManager - dep *DataDeployment - bundleFile string + blobId string backoffFunc func() markFailedAt time.Time connTimeout time.Duration @@ -127,62 +145,44 @@ func (r *DownloadRequest) downloadBundle() error { - dep := r.dep - log.Debugf("starting bundle download attempt for depId=%s: blobId=%s", dep.ID, dep.BlobID) + log.Debugf("starting bundle download attempt for blobId=%s", r.blobId) r.checkTimeout() - tempFile, err := downloadFromURI(dep.BlobID, r.connTimeout) + downloadedFile, err := downloadFromURI(r.blobId, r.connTimeout) if err != nil { - log.Errorf("Unable to download blob file blobId=%s: %s", dep.BlobID, err) + log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err) return err } - defer func() { - if tempFile != "" { - go safeDelete(tempFile) - } - }() + log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile) - err = os.Rename(tempFile, r.bundleFile) + err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile) if err != nil { - log.Errorf("Unable to rename temp blob file %s to %s: %s", tempFile, r.bundleFile, err) + log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId) return err } - blobId := atomic.AddInt64(&gwBlobId, 1) - blobIds := strconv.FormatInt(blobId, 10) - err = r.bm.dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile) - if err != nil { - return err - } - dep.GWBlobID = blobIds + log.Debugf("bundle downloaded: blobId=%s", r.blobId) - log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID) - - // send deployments to client - r.bm.apiMan.addChangedDeployment(dep.ID) + // TODO send changed deployments to subscribers (API call with "block") + //r.bm.apiMan.addChangedDeployment(dep.ID) return nil } func (r *DownloadRequest) checkTimeout() { - if !r.markFailedAt.IsZero() { - if time.Now().After(r.markFailedAt) { - r.markFailedAt = time.Time{} - log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", - r.dep.ID, r.dep.BlobID) - } + if !r.markFailedAt.IsZero() && time.Now().After(r.markFailedAt) { + r.markFailedAt = time.Time{} + log.Debugf("bundle download timeout. blobId=", r.blobId) } } -func getBundleFile(dep *DataDeployment) string { - - return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID))) - +func getBlobFilePath(blobId string) string { + return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId))) } func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) { @@ -227,7 +227,7 @@ return } - tempFile, err = ioutil.TempFile(bundlePath, "download") + tempFile, err = ioutil.TempFile(bundlePath, "blob") if err != nil { log.Errorf("Unable to create temp file: %v", err) return @@ -280,7 +280,7 @@ log.Debugf("started bundle downloader %d", w.id) for req := range w.bm.downloadQueue { - log.Debugf("starting download %s", req.bundleFile) + log.Debugf("starting download blobId=%s", req.blobId) err := req.downloadBundle() if err != nil { go func() {
diff --git a/data.go b/data.go index 267ed15..f443d9b 100644 --- a/data.go +++ b/data.go
@@ -28,18 +28,17 @@ ID string OrgID string EnvID string + BlobID string + BlobResourceID string Type string Name string Revision string - BlobID string - GWBlobID string - BlobResourceID string - Updated string - UpdatedBy string + Path string Created string CreatedBy string + Updated string + UpdatedBy string BlobFSLocation string - BlobURL string } type SQLExec interface { @@ -51,7 +50,7 @@ initDb() error getUnreadyDeployments() ([]DataDeployment, error) getReadyDeployments() ([]DataDeployment, error) - updateLocalFsLocation(string, string, string) error + updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) } @@ -80,21 +79,20 @@ func (dbc *dbManager) initDb() error { _, err := dbc.getDb().Exec(` CREATE TABLE IF NOT EXISTS edgex_blob_available ( - gwblobid integer primary key, - runtime_meta_id character varying NOT NULL, - local_fs_location character varying NOT NULL, - access_url character varying + id text primary key, + local_fs_location text NOT NULL ); `) if err != nil { return err } - log.Debug("Database tables created.") + log.Debug("Database table edgex_blob_available created.") return nil } // getUnreadyDeployments() returns array of resources that are not yet to be processed +// TODO make it work with new schema func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) { rows, err := dbc.getDb().Query(` @@ -127,66 +125,78 @@ } // getDeployments() -func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) { +func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) { - rows, err := dbc.getDb().Query(` - SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id, - a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by, - b.local_fs_location, b.access_url, b.gwblobid - FROM project_runtime_blob_metadata as a - INNER JOIN edgex_blob_available as b - ON a.id = b.runtime_meta_id + rows, err := dbc.getDb().Query(`SELECT + a.id, + a.organization_id, + a.environment_id, + a.bean_blob_id, + a.resource_blob_id, + a.type, + a.name, + a.revision, + a.path, + a.created_at, + a.created_by, + a.updated_at, + a.updated_by, + b.local_fs_location + FROM metadata_runtime_entity_metadata as a + INNER JOIN edgex_blob_available as b + ON (a.bean_blob_id = b.id OR a.resource_blob_id = b.id) + ; `) if err != nil { log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) - return + return nil, err } defer rows.Close() - for rows.Next() { - dep := DataDeployment{} - rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID, - &dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated, - &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL, &dep.GWBlobID) - deployments = append(deployments, dep) - log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID) + deployments, err := dataDeploymentsFromRow(rows) + if err != nil { + return nil, err } + + log.Debugf("Configurations ready: %v", deployments) + if len(deployments) == 0 { log.Debug("No resources ready to be deployed") err = sql.ErrNoRows } - return + return deployments, err } -func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error { +func (dbc *dbManager) updateLocalFsLocation(blobId, localFsLocation string) error { - access_url := getHttpHost() + blobEndpointPath + "/" + bundleId stmt, err := dbc.getDb().Prepare(` - INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url) - VALUES (?, ?, ?, ?)`) + INSERT OR IGNORE INTO edgex_blob_available ( + id, + local_fs_location + ) VALUES (?, ?);`) if err != nil { - log.Errorf("PREPARE updatelocal_fs_location failed: %v", err) + log.Errorf("PREPARE updateLocalFsLocation failed: %v", err) return err } defer stmt.Close() - _, err = stmt.Exec(depID, bundleId, localFsLocation, access_url) + _, err = stmt.Exec(blobId, localFsLocation) if err != nil { - log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, localFsLocation, err) + log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err) return err } - log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, localFsLocation) + log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation) return nil } -func (dbc *dbManager) getLocalFSLocation(blobId string) (locfs string, err error) { +func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) { log.Debugf("Getting the blob file for blobId {%s}", blobId) - rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"") + rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE id = '" + blobId + "'") if err != nil { log.Errorf("SELECT local_fs_location failed %v", err) return "", err @@ -194,8 +204,39 @@ defer rows.Close() for rows.Next() { - rows.Scan(&locfs) - log.Debugf("Got the blob file {%s} for blobId {%s}", locfs, blobId) + err = rows.Scan(&localFsLocation) + if err != nil { + log.Errorf("Scan local_fs_location failed %v", err) + return "", err + } + log.Debugf("Got the blob file {%s} for blobId {%s}", localFsLocation, blobId) + } + return +} + +func dataDeploymentsFromRow(rows *sql.Rows) (deployments []DataDeployment, err error) { + for rows.Next() { + dep := DataDeployment{} + err = rows.Scan( + &dep.ID, + &dep.OrgID, + &dep.EnvID, + &dep.BlobID, + &dep.BlobResourceID, + &dep.Type, + &dep.Name, + &dep.Revision, + &dep.Path, + &dep.Created, + &dep.CreatedBy, + &dep.Updated, + &dep.UpdatedBy, + &dep.BlobFSLocation, + ) + if err != nil { + return nil, err + } + deployments = append(deployments, dep) } return }
diff --git a/init.go b/init.go index 393e2a5..bc8f2a9 100644 --- a/init.go +++ b/init.go
@@ -25,7 +25,8 @@ ) const ( - configHTTProtocol = "apidHTTProtocol" + configProtocol = "protocol_type" + configAPIListen = "api_listen" configBundleDirKey = "gatewaydeploy_bundle_dir" configDebounceDuration = "gatewaydeploy_debounce_duration" configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" @@ -114,6 +115,8 @@ return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout) } + log.Debug("apiServerBaseURI = " + apiServerBaseURI.String()) + // initialize db manager dbMan := &dbManager{ @@ -158,7 +161,9 @@ } bundleMan.initializeBundleDownloading() - go apiMan.distributeEvents() + + //TODO initialize apiMan.distributeEvents() for api call with "block" + //go apiMan.distributeEvents() initListener(services, dbMan, apiMan, bundleMan) @@ -166,7 +171,3 @@ return pluginData, nil } - -func setServices() { - -}
diff --git a/listener.go b/listener.go index 12f5740..54671e9 100644 --- a/listener.go +++ b/listener.go
@@ -79,9 +79,13 @@ log.Debug("Snapshot processed") } +// TODO make it work with new schema func (h *apigeeSyncHandler) startupOnExistingDatabase() { // start bundle downloads that didn't finish go func() { + // create edgex_blob_available table + h.dbMan.initDb() + deployments, err := h.dbMan.getUnreadyDeployments() if err != nil && err != sql.ErrNoRows { @@ -131,7 +135,8 @@ // clean up old bundles if len(deletedDeployments) > 0 { log.Debugf("will delete %d old bundles", len(deletedDeployments)) - h.bundleMan.deleteBundles(deletedDeployments) + //TODO delete bundles for deleted deployments + //h.bundleMan.deleteBundles(deletedDeployments) } } @@ -153,6 +158,7 @@ return } +// TODO delete from file system func safeDelete(file string) { if e := os.Remove(file); e != nil && !os.IsNotExist(e) { log.Warnf("unable to delete file %s: %v", file, e)