update db schema, update bundle.go logics, format apis
diff --git a/api.go b/api.go
index e757122..79ed8d2 100644
--- a/api.go
+++ b/api.go
@@ -20,6 +20,7 @@
"io"
"io/ioutil"
"net/http"
+ "net/url"
"strconv"
"sync/atomic"
"time"
@@ -63,18 +64,17 @@
}
type ApiDeploymentDetails struct {
- Self string `json:"self"`
- Name string `json:"name"`
- Type string `json:"type"`
- Org string `json:"organization"`
- Env string `json:"environment"`
- Scope string `json:"scope"`
- Revision string `json:"revision"`
- BlobId string `json:"blobId"`
- BlobURL string `json:"bloburl"`
- ResourceBlobId string `json:"resourceBlobId"`
- Created string `json:"created"`
- Updated string `json:"updated"`
+ Self string `json:"self"`
+ Name string `json:"name"`
+ Type string `json:"type"`
+ Revision string `json:"revision"`
+ BeanBlobUrl string `json:"beanBlob"`
+ Org string `json:"orgId"`
+ Env string `json:"envId"`
+ ResourceBlobUrl string `json:"resourceBlob"`
+ Path string `json:"path"`
+ Created string `json:"created"`
+ Updated string `json:"updated"`
}
type ApiDeploymentResponse struct {
@@ -83,14 +83,16 @@
ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"`
}
-const deploymentsEndpoint = "/configurations"
-const blobEndpointPath = "/blob"
-const blobEndpoint = blobEndpointPath + "/{blobId}"
+const (
+ deploymentsEndpoint = "/configurations"
+ blobEndpointPath = "/blob"
+ blobEndpoint = blobEndpointPath + "/{blobId}"
+)
type apiManagerInterface interface {
InitAPI()
addChangedDeployment(string)
- distributeEvents()
+ //distributeEvents()
}
type apiManager struct {
@@ -159,6 +161,8 @@
}
}
+//TODO get notified when deployments ready
+/*
func (a *apiManager) distributeEvents() {
subscribers := make(map[chan deploymentsResult]bool)
deliverDeployments := make(chan []interface{}, 1)
@@ -191,6 +195,7 @@
}
}
}
+*/
func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
@@ -255,8 +260,9 @@
// otherwise, subscribe to any new deployment changes
var newDeploymentsChannel chan deploymentsResult
if timeout > 0 && ifNoneMatch != "" {
- newDeploymentsChannel = make(chan deploymentsResult, 1)
- a.addSubscriber <- newDeploymentsChannel
+ //TODO handle block
+ //newDeploymentsChannel = make(chan deploymentsResult, 1)
+ //a.addSubscriber <- newDeploymentsChannel
}
log.Debug("Blocking request... Waiting for new Deployments.")
@@ -300,18 +306,17 @@
for _, d := range dataDeps {
apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
- Self: apiDeps.Self + "/" + d.ID,
- Name: d.Name,
- Type: d.Type,
- Org: d.OrgID,
- Env: d.EnvID,
- Scope: a.getDeploymentScope(),
- Revision: d.Revision,
- BlobId: d.BlobID,
- BlobURL: d.BlobURL,
- ResourceBlobId: d.BlobResourceID,
- Created: convertTime(d.Created),
- Updated: convertTime(d.Updated),
+ Self: apiDeps.Self + "/" + d.ID,
+ Name: d.Name,
+ Type: d.Type,
+ Revision: d.Revision,
+ BeanBlobUrl: a.getBlobUrl(d.BlobID),
+ Org: d.OrgID,
+ Env: d.EnvID,
+ ResourceBlobUrl: a.getBlobUrl(d.BlobResourceID),
+ Path: d.Path,
+ Created: convertTime(d.Created),
+ Updated: convertTime(d.Updated),
})
}
apiDeps.ApiDeploymentsResponse = apiDepDetails
@@ -339,9 +344,12 @@
return strconv.FormatInt(e, 10)
}
-// TODO
-func (a *apiManager) getDeploymentScope() string {
- return ""
+// escape the blobId into url
+func (a *apiManager) getBlobUrl(blobId string) string {
+ if blobId == "" {
+ return ""
+ }
+ return getHttpHost() + "/" + url.PathEscape(blobId)
}
func convertTime(t string) string {
@@ -361,10 +369,10 @@
func getHttpHost() string {
// apid-core has to set this according to the protocol apid is to be run: http/https
- proto := config.GetString("protocol_type")
+ proto := config.GetString(configProtocol)
if proto == "" {
proto = "http"
}
- proto = proto + "://" + config.GetString("api_listen")
+ proto = proto + "://" + config.GetString(configAPIListen)
return proto
}
diff --git a/api_test.go b/api_test.go
index 38cef04..4acd20a 100644
--- a/api_test.go
+++ b/api_test.go
@@ -29,7 +29,8 @@
)
const (
- testUrl = "http://127.0.0.1:9000"
+ testUrl = "http://127.0.0.1:9000"
+ testBlobId = "gcs:SHA-512:39ca7ae89bb9468af34df8bc873748b4035210c91bcc01359c092c1d51364b5f3df06bc69a40621acfaa46791af9ea41bc0f3429a84738ba1a7c8d394859601a"
)
var _ = Describe("api", func() {
@@ -269,36 +270,34 @@
ID: GenerateUUID(),
OrgID: GenerateUUID(),
EnvID: GenerateUUID(),
+ BlobID: testBlobId,
+ BlobResourceID: "",
Type: "virtual-host",
Name: "vh-secure",
Revision: "1",
- BlobID: GenerateUUID(),
- GWBlobID: GenerateUUID(),
- BlobResourceID: GenerateUUID(),
- Updated: time.Now().Format(time.RFC3339),
- UpdatedBy: "haoming@google.com",
+ Path: "/organizations/Org1/",
Created: time.Now().Format(time.RFC3339),
CreatedBy: "haoming@google.com",
+ Updated: time.Now().Format(time.RFC3339),
+ UpdatedBy: "haoming@google.com",
BlobFSLocation: "BlobFSLocation",
- BlobURL: "http://localhost:6666/testBlobURL",
}
return dep
}
func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails {
detail := &ApiDeploymentDetails{
- Self: self + "/" + dep.ID,
- Name: dep.Name,
- Type: dep.Type,
- Org: dep.OrgID,
- Env: dep.EnvID,
- Scope: "",
- Revision: dep.Revision,
- BlobId: dep.BlobID,
- BlobURL: dep.BlobURL,
- ResourceBlobId: dep.BlobResourceID,
- Created: dep.Created,
- Updated: dep.Updated,
+ Self: self + "/" + dep.ID,
+ Name: dep.Name,
+ Type: dep.Type,
+ Revision: dep.Revision,
+ BeanBlobUrl: getHttpHost() + "/" + testBlobId,
+ Org: dep.OrgID,
+ Env: dep.EnvID,
+ ResourceBlobUrl: "",
+ Path: dep.Path,
+ Created: dep.Created,
+ Updated: dep.Updated,
}
return detail
}
@@ -324,7 +323,7 @@
return d.readyDeployments, nil
}
-func (d *dummyDbManager) updateLocalFsLocation(string, string, string) error {
+func (d *dummyDbManager) updateLocalFsLocation(string, string) error {
return nil
}
diff --git a/bundle.go b/bundle.go
index d104722..5be0761 100644
--- a/bundle.go
+++ b/bundle.go
@@ -22,7 +22,6 @@
"net/url"
"os"
"path"
- "strconv"
"sync/atomic"
"time"
)
@@ -35,7 +34,7 @@
initializeBundleDownloading()
queueDownloadRequest(*DataDeployment)
enqueueRequest(*DownloadRequest)
- deleteBundles([]DataDeployment)
+ //deleteBundles([]DataDeployment)
Close()
}
@@ -68,20 +67,34 @@
}
}
+// download bundle blob and resource blob
+// TODO do not download duplicate blobs
func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) {
retryIn := bm.bundleRetryDelay
maxBackOff := 5 * time.Minute
markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
- req := &DownloadRequest{
+
+ blobReq := &DownloadRequest{
bm: bm,
- dep: dep,
- bundleFile: getBundleFile(dep),
+ blobId: dep.BlobID,
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
connTimeout: bm.bundleDownloadConnTimeout,
}
- go bm.enqueueRequest(req)
+
+ resourceReq := &DownloadRequest{
+ bm: bm,
+ blobId: dep.BlobID,
+ backoffFunc: createBackoff(retryIn, maxBackOff),
+ markFailedAt: markFailedAt,
+ connTimeout: bm.bundleDownloadConnTimeout,
+ }
+
+ go func() {
+ bm.enqueueRequest(blobReq)
+ bm.enqueueRequest(resourceReq)
+ }()
}
// a blocking method to enqueue download requests
@@ -89,11 +102,13 @@
if atomic.LoadInt32(bm.isClosed) == 1 {
return
}
- defer func() {
- if r := recover(); r != nil {
- log.Warn("trying to enque requests to closed bundleManager")
- }
- }()
+ /*
+ defer func() {
+ if r := recover(); r != nil {
+ log.Warn("trying to enque requests to closed bundleManager")
+ }
+ }()
+ */
bm.downloadQueue <- r
}
@@ -102,24 +117,27 @@
close(bm.downloadQueue)
}
+// TODO add delete support
+
func (bm *bundleManager) deleteBundles(deletedDeployments []DataDeployment) {
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
- go func() {
- // give clients a minute to avoid conflicts
- time.Sleep(bm.bundleCleanupDelay)
- for _, dep := range deletedDeployments {
- bundleFile := getBundleFile(&dep)
- log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table edgex_blob_available
- safeDelete(bundleFile)
- }
- }()
+ /*
+ log.Debugf("will delete %d old bundles", len(deletedDeployments))
+ go func() {
+ // give clients a minute to avoid conflicts
+ time.Sleep(bm.bundleCleanupDelay)
+ for _, dep := range deletedDeployments {
+ bundleFile := getBlobFilePath(dep.BlobID)
+ log.Debugf("removing old bundle: %v", bundleFile)
+ // TODO Remove from the Database table edgex_blob_available
+ safeDelete(bundleFile)
+ }
+ }()
+ */
}
type DownloadRequest struct {
bm *bundleManager
- dep *DataDeployment
- bundleFile string
+ blobId string
backoffFunc func()
markFailedAt time.Time
connTimeout time.Duration
@@ -127,62 +145,44 @@
func (r *DownloadRequest) downloadBundle() error {
- dep := r.dep
- log.Debugf("starting bundle download attempt for depId=%s: blobId=%s", dep.ID, dep.BlobID)
+ log.Debugf("starting bundle download attempt for blobId=%s", r.blobId)
r.checkTimeout()
- tempFile, err := downloadFromURI(dep.BlobID, r.connTimeout)
+ downloadedFile, err := downloadFromURI(r.blobId, r.connTimeout)
if err != nil {
- log.Errorf("Unable to download blob file blobId=%s: %s", dep.BlobID, err)
+ log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err)
return err
}
- defer func() {
- if tempFile != "" {
- go safeDelete(tempFile)
- }
- }()
+ log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile)
- err = os.Rename(tempFile, r.bundleFile)
+ err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile)
if err != nil {
- log.Errorf("Unable to rename temp blob file %s to %s: %s", tempFile, r.bundleFile, err)
+ log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId)
return err
}
- blobId := atomic.AddInt64(&gwBlobId, 1)
- blobIds := strconv.FormatInt(blobId, 10)
- err = r.bm.dbMan.updateLocalFsLocation(dep.ID, blobIds, r.bundleFile)
- if err != nil {
- return err
- }
- dep.GWBlobID = blobIds
+ log.Debugf("bundle downloaded: blobId=%s", r.blobId)
- log.Debugf("bundle for depId=%s downloaded: blobId=%s", dep.ID, dep.BlobID)
-
- // send deployments to client
- r.bm.apiMan.addChangedDeployment(dep.ID)
+ // TODO send changed deployments to subscribers (API call with "block")
+ //r.bm.apiMan.addChangedDeployment(dep.ID)
return nil
}
func (r *DownloadRequest) checkTimeout() {
- if !r.markFailedAt.IsZero() {
- if time.Now().After(r.markFailedAt) {
- r.markFailedAt = time.Time{}
- log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
- r.dep.ID, r.dep.BlobID)
- }
+ if !r.markFailedAt.IsZero() && time.Now().After(r.markFailedAt) {
+ r.markFailedAt = time.Time{}
+ log.Debugf("bundle download timeout. blobId=", r.blobId)
}
}
-func getBundleFile(dep *DataDeployment) string {
-
- return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(dep.ID)))
-
+func getBlobFilePath(blobId string) string {
+ return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
}
func getSignedURL(blobId string, bundleDownloadConnTimeout time.Duration) (string, error) {
@@ -227,7 +227,7 @@
return
}
- tempFile, err = ioutil.TempFile(bundlePath, "download")
+ tempFile, err = ioutil.TempFile(bundlePath, "blob")
if err != nil {
log.Errorf("Unable to create temp file: %v", err)
return
@@ -280,7 +280,7 @@
log.Debugf("started bundle downloader %d", w.id)
for req := range w.bm.downloadQueue {
- log.Debugf("starting download %s", req.bundleFile)
+ log.Debugf("starting download blobId=%s", req.blobId)
err := req.downloadBundle()
if err != nil {
go func() {
diff --git a/data.go b/data.go
index 267ed15..f443d9b 100644
--- a/data.go
+++ b/data.go
@@ -28,18 +28,17 @@
ID string
OrgID string
EnvID string
+ BlobID string
+ BlobResourceID string
Type string
Name string
Revision string
- BlobID string
- GWBlobID string
- BlobResourceID string
- Updated string
- UpdatedBy string
+ Path string
Created string
CreatedBy string
+ Updated string
+ UpdatedBy string
BlobFSLocation string
- BlobURL string
}
type SQLExec interface {
@@ -51,7 +50,7 @@
initDb() error
getUnreadyDeployments() ([]DataDeployment, error)
getReadyDeployments() ([]DataDeployment, error)
- updateLocalFsLocation(string, string, string) error
+ updateLocalFsLocation(string, string) error
getLocalFSLocation(string) (string, error)
}
@@ -80,21 +79,20 @@
func (dbc *dbManager) initDb() error {
_, err := dbc.getDb().Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
- gwblobid integer primary key,
- runtime_meta_id character varying NOT NULL,
- local_fs_location character varying NOT NULL,
- access_url character varying
+ id text primary key,
+ local_fs_location text NOT NULL
);
`)
if err != nil {
return err
}
- log.Debug("Database tables created.")
+ log.Debug("Database table edgex_blob_available created.")
return nil
}
// getUnreadyDeployments() returns array of resources that are not yet to be processed
+// TODO make it work with new schema
func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) {
rows, err := dbc.getDb().Query(`
@@ -127,66 +125,78 @@
}
// getDeployments()
-func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) {
+func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) {
- rows, err := dbc.getDb().Query(`
- SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
- a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
- b.local_fs_location, b.access_url, b.gwblobid
- FROM project_runtime_blob_metadata as a
- INNER JOIN edgex_blob_available as b
- ON a.id = b.runtime_meta_id
+ rows, err := dbc.getDb().Query(`SELECT
+ a.id,
+ a.organization_id,
+ a.environment_id,
+ a.bean_blob_id,
+ a.resource_blob_id,
+ a.type,
+ a.name,
+ a.revision,
+ a.path,
+ a.created_at,
+ a.created_by,
+ a.updated_at,
+ a.updated_by,
+ b.local_fs_location
+ FROM metadata_runtime_entity_metadata as a
+ INNER JOIN edgex_blob_available as b
+ ON (a.bean_blob_id = b.id OR a.resource_blob_id = b.id)
+ ;
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
- return
+ return nil, err
}
defer rows.Close()
- for rows.Next() {
- dep := DataDeployment{}
- rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID,
- &dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
- &dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL, &dep.GWBlobID)
- deployments = append(deployments, dep)
- log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
+ deployments, err := dataDeploymentsFromRow(rows)
+ if err != nil {
+ return nil, err
}
+
+ log.Debugf("Configurations ready: %v", deployments)
+
if len(deployments) == 0 {
log.Debug("No resources ready to be deployed")
err = sql.ErrNoRows
}
- return
+ return deployments, err
}
-func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error {
+func (dbc *dbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
- access_url := getHttpHost() + blobEndpointPath + "/" + bundleId
stmt, err := dbc.getDb().Prepare(`
- INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
- VALUES (?, ?, ?, ?)`)
+ INSERT OR IGNORE INTO edgex_blob_available (
+ id,
+ local_fs_location
+ ) VALUES (?, ?);`)
if err != nil {
- log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
+ log.Errorf("PREPARE updateLocalFsLocation failed: %v", err)
return err
}
defer stmt.Close()
- _, err = stmt.Exec(depID, bundleId, localFsLocation, access_url)
+ _, err = stmt.Exec(blobId, localFsLocation)
if err != nil {
- log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, localFsLocation, err)
+ log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
return err
}
- log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, localFsLocation)
+ log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
return nil
}
-func (dbc *dbManager) getLocalFSLocation(blobId string) (locfs string, err error) {
+func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) {
log.Debugf("Getting the blob file for blobId {%s}", blobId)
- rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
+ rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE id = '" + blobId + "'")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
@@ -194,8 +204,39 @@
defer rows.Close()
for rows.Next() {
- rows.Scan(&locfs)
- log.Debugf("Got the blob file {%s} for blobId {%s}", locfs, blobId)
+ err = rows.Scan(&localFsLocation)
+ if err != nil {
+ log.Errorf("Scan local_fs_location failed %v", err)
+ return "", err
+ }
+ log.Debugf("Got the blob file {%s} for blobId {%s}", localFsLocation, blobId)
+ }
+ return
+}
+
+func dataDeploymentsFromRow(rows *sql.Rows) (deployments []DataDeployment, err error) {
+ for rows.Next() {
+ dep := DataDeployment{}
+ err = rows.Scan(
+ &dep.ID,
+ &dep.OrgID,
+ &dep.EnvID,
+ &dep.BlobID,
+ &dep.BlobResourceID,
+ &dep.Type,
+ &dep.Name,
+ &dep.Revision,
+ &dep.Path,
+ &dep.Created,
+ &dep.CreatedBy,
+ &dep.Updated,
+ &dep.UpdatedBy,
+ &dep.BlobFSLocation,
+ )
+ if err != nil {
+ return nil, err
+ }
+ deployments = append(deployments, dep)
}
return
}
diff --git a/init.go b/init.go
index 393e2a5..bc8f2a9 100644
--- a/init.go
+++ b/init.go
@@ -25,7 +25,8 @@
)
const (
- configHTTProtocol = "apidHTTProtocol"
+ configProtocol = "protocol_type"
+ configAPIListen = "api_listen"
configBundleDirKey = "gatewaydeploy_bundle_dir"
configDebounceDuration = "gatewaydeploy_debounce_duration"
configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
@@ -114,6 +115,8 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configDownloadConnTimeout)
}
+ log.Debug("apiServerBaseURI = " + apiServerBaseURI.String())
+
// initialize db manager
dbMan := &dbManager{
@@ -158,7 +161,9 @@
}
bundleMan.initializeBundleDownloading()
- go apiMan.distributeEvents()
+
+ //TODO initialize apiMan.distributeEvents() for api call with "block"
+ //go apiMan.distributeEvents()
initListener(services, dbMan, apiMan, bundleMan)
@@ -166,7 +171,3 @@
return pluginData, nil
}
-
-func setServices() {
-
-}
diff --git a/listener.go b/listener.go
index 12f5740..54671e9 100644
--- a/listener.go
+++ b/listener.go
@@ -79,9 +79,13 @@
log.Debug("Snapshot processed")
}
+// TODO make it work with new schema
func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
+ // create edgex_blob_available table
+ h.dbMan.initDb()
+
deployments, err := h.dbMan.getUnreadyDeployments()
if err != nil && err != sql.ErrNoRows {
@@ -131,7 +135,8 @@
// clean up old bundles
if len(deletedDeployments) > 0 {
log.Debugf("will delete %d old bundles", len(deletedDeployments))
- h.bundleMan.deleteBundles(deletedDeployments)
+ //TODO delete bundles for deleted deployments
+ //h.bundleMan.deleteBundles(deletedDeployments)
}
}
@@ -153,6 +158,7 @@
return
}
+// TODO delete from file system
func safeDelete(file string) {
if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
log.Warnf("unable to delete file %s: %v", file, e)