add tests for snapshot listener
diff --git a/api.go b/api.go
index 5d4f748..fb614af 100644
--- a/api.go
+++ b/api.go
@@ -108,12 +108,17 @@
deploymentsChanged chan interface{}
addSubscriber chan chan deploymentsResult
removeSubscriber chan chan deploymentsResult
+ apiInitialized bool
}
func (a *apiManager) InitAPI() {
- log.Debug("API endpoints initialized")
+ if a.apiInitialized {
+ return
+ }
services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
+ a.apiInitialized = true
+ log.Debug("API endpoints initialized")
}
func (a *apiManager) addChangedDeployment(id string) {
diff --git a/api_test.go b/api_test.go
index 66e790d..8899f69 100644
--- a/api_test.go
+++ b/api_test.go
@@ -338,10 +338,11 @@
readyDeployments []DataDeployment
localFSLocation string
fileResponse chan string
+ version string
}
func (d *dummyDbManager) setDbVersion(version string) {
-
+ d.version = version
}
func (d *dummyDbManager) initDb() error {
diff --git a/bundle_test.go b/bundle_test.go
index fa44b03..e848cec 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -132,9 +132,11 @@
})
type dummyApiManager struct {
+ initCalled bool
}
func (a *dummyApiManager) InitAPI() {
+ a.initCalled = true
}
type dummyBlobServer struct {
diff --git a/data_test.go b/data_test.go
index d678828..208dac8 100644
--- a/data_test.go
+++ b/data_test.go
@@ -47,7 +47,8 @@
}
testDbMan.setDbVersion("test" + strconv.Itoa(testCount))
initTestDb(testDbMan.getDb())
- testDbMan.initDb()
+ err := testDbMan.initDb()
+ Expect(err).Should(Succeed())
time.Sleep(100 * time.Millisecond)
})
@@ -57,6 +58,11 @@
})
Context("db tests", func() {
+ It("initDb() should be idempotent", func() {
+ err := testDbMan.initDb()
+ Expect(err).Should(Succeed())
+ })
+
It("should succefully initialized tables", func() {
// edgex_blob_available
rows, err := testDbMan.getDb().Query(`
diff --git a/init.go b/init.go
index d70963d..0d2376d 100644
--- a/init.go
+++ b/init.go
@@ -48,8 +48,7 @@
bundlePath string
debounceDuration time.Duration
apiServerBaseURI *url.URL
- apidInstanceID string
- apidClusterID string
+ eventHandler *apigeeSyncHandler
)
func init() {
@@ -77,16 +76,6 @@
return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
}
- if !config.IsSet(configApidInstanceID) {
- return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
- }
- apidInstanceID = config.GetString(configApidInstanceID)
-
- if !config.IsSet(configApidClusterID) {
- return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
- }
- apidClusterID = config.GetString(configApidClusterID)
-
config.SetDefault(configBundleDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
@@ -134,6 +123,7 @@
deploymentsChanged: make(chan interface{}, 5),
addSubscriber: make(chan chan deploymentsResult),
removeSubscriber: make(chan chan deploymentsResult),
+ apiInitialized: false,
}
// initialize bundle manager
@@ -166,7 +156,15 @@
//TODO initialize apiMan.distributeEvents() for api call with "block"
//go apiMan.distributeEvents()
- initListener(services, dbMan, apiMan, bundleMan)
+ // initialize event handler
+ eventHandler = &apigeeSyncHandler{
+ dbMan: dbMan,
+ apiMan: apiMan,
+ bundleMan: bundleMan,
+ closed: false,
+ }
+
+ eventHandler.initListener(services)
log.Debug("end init")
diff --git a/listener.go b/listener.go
index a6b637c..c2b05df 100644
--- a/listener.go
+++ b/listener.go
@@ -25,16 +25,15 @@
CONFIG_METADATA_TABLE = "project.runtime_blob_metadata"
)
-var apiInitialized bool
+func (h *apigeeSyncHandler) initListener(services apid.Services) {
+ services.Events().Listen(APIGEE_SYNC_EVENT, h)
+}
-func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) {
- handler := &apigeeSyncHandler{
- dbMan: dbMan,
- apiMan: apiMan,
- bundleMan: bundleMan,
+func (h *apigeeSyncHandler) stopListener(services apid.Services) {
+ if !h.closed {
+ services.Events().StopListening(APIGEE_SYNC_EVENT, h)
+ h.closed = true
}
-
- services.Events().Listen(APIGEE_SYNC_EVENT, handler)
}
type bundleConfigJson struct {
@@ -48,6 +47,7 @@
dbMan dbManagerInterface
apiMan apiManagerInterface
bundleMan bundleManagerInterface
+ closed bool
}
func (h *apigeeSyncHandler) String() string {
@@ -72,10 +72,7 @@
h.dbMan.setDbVersion(snapshot.SnapshotInfo)
h.startupOnExistingDatabase()
- if !apiInitialized {
- h.apiMan.InitAPI()
- apiInitialized = true
- }
+ h.apiMan.InitAPI()
log.Debug("Snapshot processed")
}
diff --git a/listener_test.go b/listener_test.go
new file mode 100644
index 0000000..0cced6f
--- /dev/null
+++ b/listener_test.go
@@ -0,0 +1,128 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apiGatewayConfDeploy
+
+import (
+ "fmt"
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "math/rand"
+ "time"
+)
+
+var _ = Describe("listener", func() {
+ var dummyDbMan *dummyDbManager
+ var dummyApiMan *dummyApiManager
+ var dummyBundleMan *dummyBundleManager
+ var testHandler *apigeeSyncHandler
+ var _ = BeforeEach(func() {
+ // stop handler created by initPlugin()
+ eventHandler.stopListener(services)
+
+ dummyApiMan = &dummyApiManager{}
+ dummyDbMan = &dummyDbManager{}
+ dummyBundleMan = &dummyBundleManager{
+ requestChan: make(chan *DownloadRequest),
+ }
+ testHandler = &apigeeSyncHandler{
+ dbMan: dummyDbMan,
+ apiMan: dummyApiMan,
+ bundleMan: dummyBundleMan,
+ }
+ testHandler.initListener(services)
+ time.Sleep(100 * time.Millisecond)
+ })
+
+ var _ = AfterEach(func() {
+ testHandler.stopListener(services)
+ })
+ Context("Snapshot", func() {
+
+ It("Snapshot event shoud enqueue download requests for all unready blobs", func() {
+ // init unready blob ids
+ unreadyBlobIds := make([]string, 0)
+ blobMap := make(map[string]int)
+ for i := 0; i < rand.Intn(10); i++ {
+ id := GenerateUUID()
+ blobMap[id] = 1
+ unreadyBlobIds = append(unreadyBlobIds, id)
+ }
+ dummyDbMan.unreadyBlobIds = unreadyBlobIds
+
+ // emit snapshot
+ snapshot := &common.Snapshot{
+ SnapshotInfo: fmt.Sprint(rand.Uint32()),
+ }
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+
+ for i := 0; i < len(unreadyBlobIds); i++ {
+ req := <-dummyBundleMan.requestChan
+ blobMap[req.blobId]++
+ }
+
+ // verify all unready blobids are enqueued
+ for _, val := range blobMap {
+ Expect(val).Should(Equal(2))
+ }
+ })
+
+ It("Snapshot events shoud set db version, and should only init API endpoint once", func() {
+
+ // emit snapshot
+ for i := 0; i < 2+rand.Intn(5); i++ {
+ version := fmt.Sprint(rand.Uint32())
+ snapshot := &common.Snapshot{
+ SnapshotInfo: version,
+ }
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ Expect(dummyDbMan.version).Should(Equal(version))
+ }
+
+ // verify init API called
+ Expect(dummyApiMan.initCalled).Should(BeTrue())
+ })
+
+ })
+
+})
+
+type dummyBundleManager struct {
+ requestChan chan *DownloadRequest
+}
+
+func (bm *dummyBundleManager) initializeBundleDownloading() {
+
+}
+
+func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
+
+}
+
+func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
+ bm.requestChan <- req
+}
+
+func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest {
+ return &DownloadRequest{
+ blobId: blobId,
+ }
+}
+
+func (bm *dummyBundleManager) Close() {
+
+}