Merge branch 'XAPID-999' into rj_XAPID-999
diff --git a/api.go b/api.go
index fd41fb8..6dbdba3 100644
--- a/api.go
+++ b/api.go
@@ -108,12 +108,17 @@
deploymentsChanged chan interface{}
addSubscriber chan chan deploymentsResult
removeSubscriber chan chan deploymentsResult
+ apiInitialized bool
}
func (a *apiManager) InitAPI() {
- log.Debug("API endpoints initialized")
+ if a.apiInitialized {
+ return
+ }
services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
+ a.apiInitialized = true
+ log.Debug("API endpoints initialized")
}
func (a *apiManager) addChangedDeployment(id string) {
diff --git a/api_test.go b/api_test.go
index 66e790d..bbdd9c4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -311,7 +311,6 @@
CreatedBy: "haoming@google.com",
Updated: time.Now().Format(time.RFC3339),
UpdatedBy: "haoming@google.com",
- BlobFSLocation: "BlobFSLocation",
}
return dep
}
@@ -338,10 +337,11 @@
readyDeployments []DataDeployment
localFSLocation string
fileResponse chan string
+ version string
}
func (d *dummyDbManager) setDbVersion(version string) {
-
+ d.version = version
}
func (d *dummyDbManager) initDb() error {
diff --git a/bundle.go b/bundle.go
index b0dc434..a53f31c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -36,7 +36,7 @@
queueDownloadRequest(*DataDeployment)
enqueueRequest(*DownloadRequest)
makeDownloadRequest(string) *DownloadRequest
- //deleteBundles([]DataDeployment)
+ deleteBundles([]DataDeployment)
Close()
}
@@ -128,7 +128,7 @@
for _, dep := range deletedDeployments {
bundleFile := getBlobFilePath(dep.BlobID)
log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table edgex_blob_available
+ // TODO Remove from the Database table apid_blob_available
safeDelete(bundleFile)
}
}()
diff --git a/bundle_test.go b/bundle_test.go
index fa44b03..e848cec 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -132,9 +132,11 @@
})
type dummyApiManager struct {
+ initCalled bool
}
func (a *dummyApiManager) InitAPI() {
+ a.initCalled = true
}
type dummyBlobServer struct {
diff --git a/data.go b/data.go
index e02cf60..366e010 100644
--- a/data.go
+++ b/data.go
@@ -38,7 +38,6 @@
CreatedBy string
Updated string
UpdatedBy string
- BlobFSLocation string
}
type SQLExec interface {
@@ -78,7 +77,7 @@
func (dbc *dbManager) initDb() error {
_, err := dbc.getDb().Exec(`
- CREATE TABLE IF NOT EXISTS edgex_blob_available (
+ CREATE TABLE IF NOT EXISTS apid_blob_available (
id text primary key,
local_fs_location text NOT NULL
);
@@ -87,7 +86,7 @@
return err
}
- log.Debug("Database table edgex_blob_available created.")
+ log.Debug("Database table apid_blob_available created.")
return nil
}
@@ -99,7 +98,7 @@
rows, err := dbc.getDb().Query(`
SELECT a.bean_blob_id
FROM metadata_runtime_entity_metadata as a
- LEFT JOIN edgex_blob_available as b
+ LEFT JOIN apid_blob_available as b
ON a.bean_blob_id = b.id
WHERE b.id IS NULL;
`)
@@ -118,7 +117,7 @@
rows, err = dbc.getDb().Query(`
SELECT a.resource_blob_id
FROM metadata_runtime_entity_metadata as a
- LEFT JOIN edgex_blob_available as b
+ LEFT JOIN apid_blob_available as b
ON a.resource_blob_id = b.id
WHERE (b.id IS NULL AND a.resource_blob_id IS NOT NULL AND a.resource_blob_id != '');
`)
@@ -140,6 +139,7 @@
// TODO there's a bug in the db statement
func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) {
+
rows, err := dbc.getDb().Query(`
SELECT a.id,
a.organization_id,
@@ -181,7 +181,6 @@
ON a.bean_blob_id = b.id
WHERE a.resource_blob_id = ""
)
-
;
`)
@@ -209,7 +208,7 @@
func (dbc *dbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
stmt, err := dbc.getDb().Prepare(`
- INSERT OR IGNORE INTO edgex_blob_available (
+ INSERT OR IGNORE INTO apid_blob_available (
id,
local_fs_location
) VALUES (?, ?);`)
@@ -221,11 +220,11 @@
_, err = stmt.Exec(blobId, localFsLocation)
if err != nil {
- log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
+ log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
return err
}
- log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
+ log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
return nil
}
@@ -233,7 +232,7 @@
func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) {
log.Debugf("Getting the blob file for blobId {%s}", blobId)
- rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE id = '" + blobId + "'")
+ rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
@@ -268,7 +267,6 @@
&dep.CreatedBy,
&dep.Updated,
&dep.UpdatedBy,
- &dep.BlobFSLocation,
)
if err != nil {
return nil, err
diff --git a/data_test.go b/data_test.go
index d678828..29f777a 100644
--- a/data_test.go
+++ b/data_test.go
@@ -47,7 +47,8 @@
}
testDbMan.setDbVersion("test" + strconv.Itoa(testCount))
initTestDb(testDbMan.getDb())
- testDbMan.initDb()
+ err := testDbMan.initDb()
+ Expect(err).Should(Succeed())
time.Sleep(100 * time.Millisecond)
})
@@ -57,10 +58,15 @@
})
Context("db tests", func() {
+ It("initDb() should be idempotent", func() {
+ err := testDbMan.initDb()
+ Expect(err).Should(Succeed())
+ })
+
It("should succefully initialized tables", func() {
- // edgex_blob_available
+ // apid_blob_available
rows, err := testDbMan.getDb().Query(`
- SELECT count(*) from edgex_blob_available;
+ SELECT count(*) from apid_blob_available;
`)
Expect(err).Should(Succeed())
defer rows.Close()
@@ -86,9 +92,9 @@
err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
Expect(err).Should(Succeed())
- // edgex_blob_available
+ // apid_blob_available
rows, err := testDbMan.getDb().Query(`
- SELECT count(*) from edgex_blob_available;
+ SELECT count(*) from apid_blob_available;
`)
Expect(err).Should(Succeed())
defer rows.Close()
@@ -104,7 +110,7 @@
err := testDbMan.updateLocalFsLocation(readyBlobId, readyblobLocalFs)
Expect(err).Should(Succeed())
- // edgex_blob_available
+ // apid_blob_available
location, err := testDbMan.getLocalFSLocation(readyBlobId)
Expect(err).Should(Succeed())
Expect(location).Should(Equal(readyblobLocalFs))
@@ -118,7 +124,6 @@
Expect(err).Should(Succeed())
Expect(len(deps)).Should(Equal(1))
Expect(deps[0].BlobID).Should(Equal(readyBlobId))
- Expect(deps[0].BlobFSLocation).Should(Equal(readyblobLocalFs))
})
It("should succefully get unready blob ids", func() {
diff --git a/init.go b/init.go
index 43ae0e0..d27b566 100644
--- a/init.go
+++ b/init.go
@@ -49,8 +49,7 @@
bundlePath string
debounceDuration time.Duration
apiServerBaseURI *url.URL
- apidInstanceID string
- apidClusterID string
+ eventHandler *apigeeSyncHandler
)
func init() {
@@ -78,16 +77,6 @@
return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
}
- if !config.IsSet(configApidInstanceID) {
- return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
- }
- apidInstanceID = config.GetString(configApidInstanceID)
-
- if !config.IsSet(configApidClusterID) {
- return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
- }
- apidClusterID = config.GetString(configApidClusterID)
-
config.SetDefault(configBundleDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
@@ -135,6 +124,7 @@
deploymentsChanged: make(chan interface{}, 5),
addSubscriber: make(chan chan deploymentsResult),
removeSubscriber: make(chan chan deploymentsResult),
+ apiInitialized: false,
}
// initialize bundle manager
@@ -167,7 +157,15 @@
//TODO initialize apiMan.distributeEvents() for api call with "block"
//go apiMan.distributeEvents()
- initListener(services, dbMan, apiMan, bundleMan)
+ // initialize event handler
+ eventHandler = &apigeeSyncHandler{
+ dbMan: dbMan,
+ apiMan: apiMan,
+ bundleMan: bundleMan,
+ closed: false,
+ }
+
+ eventHandler.initListener(services)
log.Debug("end init")
diff --git a/listener.go b/listener.go
index a6b637c..8961bae 100644
--- a/listener.go
+++ b/listener.go
@@ -22,19 +22,18 @@
const (
APIGEE_SYNC_EVENT = "ApigeeSync"
- CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
+ CONFIG_METADATA_TABLE = "metadata.runtime_entity_metadata"
)
-var apiInitialized bool
+func (h *apigeeSyncHandler) initListener(services apid.Services) {
+ services.Events().Listen(APIGEE_SYNC_EVENT, h)
+}
-func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) {
- handler := &apigeeSyncHandler{
- dbMan: dbMan,
- apiMan: apiMan,
- bundleMan: bundleMan,
+func (h *apigeeSyncHandler) stopListener(services apid.Services) {
+ if !h.closed {
+ services.Events().StopListening(APIGEE_SYNC_EVENT, h)
+ h.closed = true
}
-
- services.Events().Listen(APIGEE_SYNC_EVENT, handler)
}
type bundleConfigJson struct {
@@ -48,6 +47,7 @@
dbMan dbManagerInterface
apiMan apiManagerInterface
bundleMan bundleManagerInterface
+ closed bool
}
func (h *apigeeSyncHandler) String() string {
@@ -72,10 +72,7 @@
h.dbMan.setDbVersion(snapshot.SnapshotInfo)
h.startupOnExistingDatabase()
- if !apiInitialized {
- h.apiMan.InitAPI()
- apiInitialized = true
- }
+ h.apiMan.InitAPI()
log.Debug("Snapshot processed")
}
@@ -83,7 +80,7 @@
func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
- // create edgex_blob_available table
+ // create apid_blob_available table
h.dbMan.initDb()
blobIds, err := h.dbMan.getUnreadyBlobs()
@@ -130,15 +127,15 @@
}
*/
- for _, dep := range insertedDeployments {
- go h.bundleMan.queueDownloadRequest(&dep)
+ for i := range insertedDeployments {
+ go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
}
// clean up old bundles
if len(deletedDeployments) > 0 {
log.Debugf("will delete %d old bundles", len(deletedDeployments))
//TODO delete bundles for deleted deployments
- //h.bundleMan.deleteBundles(deletedDeployments)
+ h.bundleMan.deleteBundles(deletedDeployments)
}
}
diff --git a/listener_test.go b/listener_test.go
new file mode 100644
index 0000000..d82ce1c
--- /dev/null
+++ b/listener_test.go
@@ -0,0 +1,220 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apiGatewayConfDeploy
+
+import (
+ "fmt"
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "math/rand"
+ "reflect"
+ "time"
+)
+
+var _ = Describe("listener", func() {
+ var dummyDbMan *dummyDbManager
+ var dummyApiMan *dummyApiManager
+ var dummyBundleMan *dummyBundleManager
+ var testHandler *apigeeSyncHandler
+ var _ = BeforeEach(func() {
+ // stop handler created by initPlugin()
+ eventHandler.stopListener(services)
+
+ dummyApiMan = &dummyApiManager{}
+ dummyDbMan = &dummyDbManager{}
+ dummyBundleMan = &dummyBundleManager{
+ requestChan: make(chan *DownloadRequest),
+ depChan: make(chan *DataDeployment),
+ delChan: make(chan *DataDeployment),
+ }
+ testHandler = &apigeeSyncHandler{
+ dbMan: dummyDbMan,
+ apiMan: dummyApiMan,
+ bundleMan: dummyBundleMan,
+ }
+ testHandler.initListener(services)
+ time.Sleep(100 * time.Millisecond)
+ })
+
+ var _ = AfterEach(func() {
+ testHandler.stopListener(services)
+ })
+ Context("Snapshot", func() {
+
+ It("Snapshot event shoud enqueue download requests for all unready blobs", func() {
+ // init unready blob ids
+ unreadyBlobIds := make([]string, 0)
+ blobMap := make(map[string]int)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ id := GenerateUUID()
+ blobMap[id] = 1
+ unreadyBlobIds = append(unreadyBlobIds, id)
+ }
+ dummyDbMan.unreadyBlobIds = unreadyBlobIds
+
+ // emit snapshot
+ snapshot := &common.Snapshot{
+ SnapshotInfo: fmt.Sprint(rand.Uint32()),
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+
+ for i := 0; i < len(unreadyBlobIds); i++ {
+ req := <-dummyBundleMan.requestChan
+ blobMap[req.blobId]++
+ }
+
+ // verify all unready blobids are enqueued
+ for _, val := range blobMap {
+ Expect(val).Should(Equal(2))
+ }
+ })
+
+ It("Snapshot events shoud set db version, and should only init API endpoint once", func() {
+
+ // emit snapshot
+ for i := 0; i < 2+rand.Intn(5); i++ {
+ version := fmt.Sprint(rand.Uint32())
+ snapshot := &common.Snapshot{
+ SnapshotInfo: version,
+ }
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ Expect(dummyDbMan.version).Should(Equal(version))
+ }
+
+ // verify init API called
+ Expect(dummyApiMan.initCalled).Should(BeTrue())
+ })
+
+ })
+
+ Context("Change list", func() {
+
+ It("Insert event shoud enqueue download requests for all inserted deployments", func() {
+ // emit change event
+ changes := make([]common.Change, 0)
+ deployments := make(map[string]DataDeployment)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ dep := makeTestDeployment()
+ change := common.Change{
+ Operation: common.Insert,
+ Table: CONFIG_METADATA_TABLE,
+ NewRow: rowFromDeployment(dep),
+ }
+ changes = append(changes, change)
+ deployments[dep.ID] = *dep
+ }
+
+ changeList := &common.ChangeList{
+ Changes: changes,
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+ // verify
+ for i := 0; i < len(changes); i++ {
+ dep := <-dummyBundleMan.depChan
+ Expect(reflect.DeepEqual(deployments[dep.ID], *dep)).Should(BeTrue())
+ delete(deployments, dep.ID)
+ }
+ Expect(len(deployments)).Should(BeZero())
+ })
+
+ It("Delete event shoud deliver to the bundle manager", func() {
+ // emit change event
+ changes := make([]common.Change, 0)
+ deployments := make(map[string]bool)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ dep := makeTestDeployment()
+ change := common.Change{
+ Operation: common.Delete,
+ Table: CONFIG_METADATA_TABLE,
+ OldRow: rowFromDeployment(dep),
+ }
+ changes = append(changes, change)
+ deployments[dep.ID] = true
+ }
+
+ changeList := &common.ChangeList{
+ Changes: changes,
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+
+ // verify
+ for i := 0; i < len(changes); i++ {
+ dep := <-dummyBundleMan.delChan
+ Expect(deployments[dep.ID]).Should(BeTrue())
+ delete(deployments, dep.ID)
+ }
+ Expect(len(deployments)).Should(BeZero())
+ })
+ })
+})
+
+type dummyBundleManager struct {
+ requestChan chan *DownloadRequest
+ depChan chan *DataDeployment
+ delChan chan *DataDeployment
+}
+
+func (bm *dummyBundleManager) initializeBundleDownloading() {
+
+}
+
+func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
+ bm.depChan <- dep
+}
+
+func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
+ bm.requestChan <- req
+}
+
+func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest {
+ return &DownloadRequest{
+ blobId: blobId,
+ }
+}
+
+func (bm *dummyBundleManager) deleteBundles(deployments []DataDeployment) {
+ for i := range deployments {
+ bm.delChan <- &deployments[i]
+ }
+}
+
+func (bm *dummyBundleManager) Close() {
+
+}
+
+func rowFromDeployment(dep *DataDeployment) common.Row {
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: dep.ID}
+ row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
+ row["environment_id"] = &common.ColumnVal{Value: dep.EnvID}
+ row["bean_blob_id"] = &common.ColumnVal{Value: dep.BlobID}
+ row["resource_blob_id"] = &common.ColumnVal{Value: dep.BlobResourceID}
+ row["type"] = &common.ColumnVal{Value: dep.Type}
+ row["name"] = &common.ColumnVal{Value: dep.Name}
+ row["revision"] = &common.ColumnVal{Value: dep.Revision}
+ row["path"] = &common.ColumnVal{Value: dep.Path}
+ row["created_at"] = &common.ColumnVal{Value: dep.Created}
+ row["created_by"] = &common.ColumnVal{Value: dep.CreatedBy}
+ row["updated_at"] = &common.ColumnVal{Value: dep.Updated}
+ row["updated_by"] = &common.ColumnVal{Value: dep.UpdatedBy}
+
+ return row
+}