add test, fix bugs
diff --git a/api_test.go b/api_test.go
index 8899f69..bbdd9c4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -311,7 +311,6 @@
CreatedBy: "haoming@google.com",
Updated: time.Now().Format(time.RFC3339),
UpdatedBy: "haoming@google.com",
- BlobFSLocation: "BlobFSLocation",
}
return dep
}
diff --git a/bundle.go b/bundle.go
index b0dc434..660bd53 100644
--- a/bundle.go
+++ b/bundle.go
@@ -128,7 +128,7 @@
for _, dep := range deletedDeployments {
bundleFile := getBlobFilePath(dep.BlobID)
log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table edgex_blob_available
+ // TODO Remove from the Database table apid_blob_available
safeDelete(bundleFile)
}
}()
diff --git a/data.go b/data.go
index 80beedd..2aa7f13 100644
--- a/data.go
+++ b/data.go
@@ -38,7 +38,6 @@
CreatedBy string
Updated string
UpdatedBy string
- BlobFSLocation string
}
type SQLExec interface {
@@ -78,7 +77,7 @@
func (dbc *dbManager) initDb() error {
_, err := dbc.getDb().Exec(`
- CREATE TABLE IF NOT EXISTS edgex_blob_available (
+ CREATE TABLE IF NOT EXISTS apid_blob_available (
id text primary key,
local_fs_location text NOT NULL
);
@@ -87,7 +86,7 @@
return err
}
- log.Debug("Database table edgex_blob_available created.")
+ log.Debug("Database table apid_blob_available created.")
return nil
}
@@ -99,7 +98,7 @@
rows, err := dbc.getDb().Query(`
SELECT a.bean_blob_id
FROM metadata_runtime_entity_metadata as a
- LEFT JOIN edgex_blob_available as b
+ LEFT JOIN apid_blob_available as b
ON a.bean_blob_id = b.id
WHERE b.id IS NULL;
`)
@@ -118,7 +117,7 @@
rows, err = dbc.getDb().Query(`
SELECT a.resource_blob_id
FROM metadata_runtime_entity_metadata as a
- LEFT JOIN edgex_blob_available as b
+ LEFT JOIN apid_blob_available as b
ON a.resource_blob_id = b.id
WHERE (b.id IS NULL AND a.resource_blob_id IS NOT NULL AND a.resource_blob_id != '');
`)
@@ -153,10 +152,9 @@
a.created_at,
a.created_by,
a.updated_at,
- a.updated_by,
- b.local_fs_location
+ a.updated_by
FROM metadata_runtime_entity_metadata as a
- INNER JOIN edgex_blob_available as b
+ INNER JOIN apid_blob_available as b
ON (a.bean_blob_id = b.id OR a.resource_blob_id = b.id)
;
`)
@@ -185,7 +183,7 @@
func (dbc *dbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
stmt, err := dbc.getDb().Prepare(`
- INSERT OR IGNORE INTO edgex_blob_available (
+ INSERT OR IGNORE INTO apid_blob_available (
id,
local_fs_location
) VALUES (?, ?);`)
@@ -197,11 +195,11 @@
_, err = stmt.Exec(blobId, localFsLocation)
if err != nil {
- log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
+ log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
return err
}
- log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
+ log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
return nil
}
@@ -209,7 +207,7 @@
func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) {
log.Debugf("Getting the blob file for blobId {%s}", blobId)
- rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE id = '" + blobId + "'")
+ rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
@@ -244,7 +242,6 @@
&dep.CreatedBy,
&dep.Updated,
&dep.UpdatedBy,
- &dep.BlobFSLocation,
)
if err != nil {
return nil, err
diff --git a/data_test.go b/data_test.go
index 208dac8..29f777a 100644
--- a/data_test.go
+++ b/data_test.go
@@ -64,9 +64,9 @@
})
It("should succefully initialized tables", func() {
- // edgex_blob_available
+ // apid_blob_available
rows, err := testDbMan.getDb().Query(`
- SELECT count(*) from edgex_blob_available;
+ SELECT count(*) from apid_blob_available;
`)
Expect(err).Should(Succeed())
defer rows.Close()
@@ -92,9 +92,9 @@
err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
Expect(err).Should(Succeed())
- // edgex_blob_available
+ // apid_blob_available
rows, err := testDbMan.getDb().Query(`
- SELECT count(*) from edgex_blob_available;
+ SELECT count(*) from apid_blob_available;
`)
Expect(err).Should(Succeed())
defer rows.Close()
@@ -110,7 +110,7 @@
err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
Expect(err).Should(Succeed())
- // edgex_blob_available
+ // apid_blob_available
location, err := testDbMan.getLocalFSLocation(readyBlobId)
Expect(err).Should(Succeed())
Expect(location).Should(Equal(readyblobLocalFs))
@@ -124,7 +124,6 @@
Expect(err).Should(Succeed())
Expect(len(deps)).Should(Equal(1))
Expect(deps[0].BlobID).Should(Equal(readyBlobId))
- Expect(deps[0].BlobFSLocation).Should(Equal(readyblobLocalFs))
})
It("should succefully get unready blob ids", func() {
diff --git a/listener.go b/listener.go
index c2b05df..701d3a1 100644
--- a/listener.go
+++ b/listener.go
@@ -22,7 +22,7 @@
const (
APIGEE_SYNC_EVENT = "ApigeeSync"
- CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
+ CONFIG_METADATA_TABLE = "metadata.runtime_entity_metadata"
)
func (h *apigeeSyncHandler) initListener(services apid.Services) {
@@ -80,7 +80,7 @@
func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
- // create edgex_blob_available table
+ // create apid_blob_available table
h.dbMan.initDb()
blobIds, err := h.dbMan.getUnreadyBlobs()
@@ -127,8 +127,8 @@
}
*/
- for _, dep := range insertedDeployments {
- go h.bundleMan.queueDownloadRequest(&dep)
+ for i := range insertedDeployments {
+ go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
}
// clean up old bundles
diff --git a/listener_test.go b/listener_test.go
index 0cced6f..ad0b258 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -21,6 +21,7 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
+ "reflect"
"time"
)
@@ -37,6 +38,7 @@
dummyDbMan = &dummyDbManager{}
dummyBundleMan = &dummyBundleManager{
requestChan: make(chan *DownloadRequest),
+ depChan: make(chan *DataDeployment),
}
testHandler = &apigeeSyncHandler{
dbMan: dummyDbMan,
@@ -56,7 +58,7 @@
// init unready blob ids
unreadyBlobIds := make([]string, 0)
blobMap := make(map[string]int)
- for i := 0; i < rand.Intn(10); i++ {
+ for i := 0; i < 1+rand.Intn(10); i++ {
id := GenerateUUID()
blobMap[id] = 1
unreadyBlobIds = append(unreadyBlobIds, id)
@@ -99,10 +101,41 @@
})
+ Context("Change list", func() {
+
+ It("Insert event shoud enqueue download requests for all inserted deployments", func() {
+ // emit change event
+ changes := make([]common.Change, 0)
+ deployments := make(map[string]DataDeployment)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ dep := makeTestDeployment()
+ change := common.Change{
+ Operation: common.Insert,
+ Table: CONFIG_METADATA_TABLE,
+ NewRow: rowFromDeployment(dep),
+ }
+ changes = append(changes, change)
+ deployments[dep.ID] = *dep
+ }
+
+ changeList := &common.ChangeList{
+ Changes: changes,
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+ for i := 0; i < len(changes); i++ {
+ dep := <-dummyBundleMan.depChan
+ Expect(reflect.DeepEqual(deployments[dep.ID], *dep)).Should(BeTrue())
+ delete(deployments, dep.ID)
+ }
+ })
+ })
})
type dummyBundleManager struct {
requestChan chan *DownloadRequest
+ depChan chan *DataDeployment
}
func (bm *dummyBundleManager) initializeBundleDownloading() {
@@ -110,7 +143,7 @@
}
func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
-
+ bm.depChan <- dep
}
func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
@@ -126,3 +159,22 @@
func (bm *dummyBundleManager) Close() {
}
+
+func rowFromDeployment(dep *DataDeployment) common.Row {
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: dep.ID}
+ row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
+ row["environment_id"] = &common.ColumnVal{Value: dep.EnvID}
+ row["bean_blob_id"] = &common.ColumnVal{Value: dep.BlobID}
+ row["resource_blob_id"] = &common.ColumnVal{Value: dep.BlobResourceID}
+ row["type"] = &common.ColumnVal{Value: dep.Type}
+ row["name"] = &common.ColumnVal{Value: dep.Name}
+ row["revision"] = &common.ColumnVal{Value: dep.Revision}
+ row["path"] = &common.ColumnVal{Value: dep.Path}
+ row["created_at"] = &common.ColumnVal{Value: dep.Created}
+ row["created_by"] = &common.ColumnVal{Value: dep.CreatedBy}
+ row["updated_at"] = &common.ColumnVal{Value: dep.Updated}
+ row["updated_by"] = &common.ColumnVal{Value: dep.UpdatedBy}
+
+ return row
+}