add tests for snapshot listener
diff --git a/api.go b/api.go index 5d4f748..fb614af 100644 --- a/api.go +++ b/api.go
@@ -108,12 +108,17 @@ deploymentsChanged chan interface{} addSubscriber chan chan deploymentsResult removeSubscriber chan chan deploymentsResult + apiInitialized bool } func (a *apiManager) InitAPI() { - log.Debug("API endpoints initialized") + if a.apiInitialized { + return + } services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") + a.apiInitialized = true + log.Debug("API endpoints initialized") } func (a *apiManager) addChangedDeployment(id string) {
diff --git a/api_test.go b/api_test.go index 66e790d..8899f69 100644 --- a/api_test.go +++ b/api_test.go
@@ -338,10 +338,11 @@ readyDeployments []DataDeployment localFSLocation string fileResponse chan string + version string } func (d *dummyDbManager) setDbVersion(version string) { - + d.version = version } func (d *dummyDbManager) initDb() error {
diff --git a/bundle_test.go b/bundle_test.go index fa44b03..e848cec 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -132,9 +132,11 @@ }) type dummyApiManager struct { + initCalled bool } func (a *dummyApiManager) InitAPI() { + a.initCalled = true } type dummyBlobServer struct {
diff --git a/data_test.go b/data_test.go index d678828..208dac8 100644 --- a/data_test.go +++ b/data_test.go
@@ -47,7 +47,8 @@ } testDbMan.setDbVersion("test" + strconv.Itoa(testCount)) initTestDb(testDbMan.getDb()) - testDbMan.initDb() + err := testDbMan.initDb() + Expect(err).Should(Succeed()) time.Sleep(100 * time.Millisecond) }) @@ -57,6 +58,11 @@ }) Context("db tests", func() { + It("initDb() should be idempotent", func() { + err := testDbMan.initDb() + Expect(err).Should(Succeed()) + }) + It("should succefully initialized tables", func() { // edgex_blob_available rows, err := testDbMan.getDb().Query(`
diff --git a/init.go b/init.go index d70963d..0d2376d 100644 --- a/init.go +++ b/init.go
@@ -48,8 +48,7 @@ bundlePath string debounceDuration time.Duration apiServerBaseURI *url.URL - apidInstanceID string - apidClusterID string + eventHandler *apigeeSyncHandler ) func init() { @@ -77,16 +76,6 @@ return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err) } - if !config.IsSet(configApidInstanceID) { - return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID) - } - apidInstanceID = config.GetString(configApidInstanceID) - - if !config.IsSet(configApidClusterID) { - return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID) - } - apidClusterID = config.GetString(configApidClusterID) - config.SetDefault(configBundleDirKey, "bundles") config.SetDefault(configDebounceDuration, time.Second) config.SetDefault(configBundleCleanupDelay, time.Minute) @@ -134,6 +123,7 @@ deploymentsChanged: make(chan interface{}, 5), addSubscriber: make(chan chan deploymentsResult), removeSubscriber: make(chan chan deploymentsResult), + apiInitialized: false, } // initialize bundle manager @@ -166,7 +156,15 @@ //TODO initialize apiMan.distributeEvents() for api call with "block" //go apiMan.distributeEvents() - initListener(services, dbMan, apiMan, bundleMan) + // initialize event handler + eventHandler = &apigeeSyncHandler{ + dbMan: dbMan, + apiMan: apiMan, + bundleMan: bundleMan, + closed: false, + } + + eventHandler.initListener(services) log.Debug("end init")
diff --git a/listener.go b/listener.go index a6b637c..c2b05df 100644 --- a/listener.go +++ b/listener.go
@@ -25,16 +25,15 @@ CONFIG_METADATA_TABLE = "project.runtime_blob_metadata" ) -var apiInitialized bool +func (h *apigeeSyncHandler) initListener(services apid.Services) { + services.Events().Listen(APIGEE_SYNC_EVENT, h) +} -func initListener(services apid.Services, dbMan dbManagerInterface, apiMan apiManagerInterface, bundleMan bundleManagerInterface) { - handler := &apigeeSyncHandler{ - dbMan: dbMan, - apiMan: apiMan, - bundleMan: bundleMan, +func (h *apigeeSyncHandler) stopListener(services apid.Services) { + if !h.closed { + services.Events().StopListening(APIGEE_SYNC_EVENT, h) + h.closed = true } - - services.Events().Listen(APIGEE_SYNC_EVENT, handler) } type bundleConfigJson struct { @@ -48,6 +47,7 @@ dbMan dbManagerInterface apiMan apiManagerInterface bundleMan bundleManagerInterface + closed bool } func (h *apigeeSyncHandler) String() string { @@ -72,10 +72,7 @@ h.dbMan.setDbVersion(snapshot.SnapshotInfo) h.startupOnExistingDatabase() - if !apiInitialized { - h.apiMan.InitAPI() - apiInitialized = true - } + h.apiMan.InitAPI() log.Debug("Snapshot processed") }
diff --git a/listener_test.go b/listener_test.go new file mode 100644 index 0000000..0cced6f --- /dev/null +++ b/listener_test.go
@@ -0,0 +1,128 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apiGatewayConfDeploy + +import ( + "fmt" + "github.com/30x/apid-core" + "github.com/apigee-labs/transicator/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "math/rand" + "time" +) + +var _ = Describe("listener", func() { + var dummyDbMan *dummyDbManager + var dummyApiMan *dummyApiManager + var dummyBundleMan *dummyBundleManager + var testHandler *apigeeSyncHandler + var _ = BeforeEach(func() { + // stop handler created by initPlugin() + eventHandler.stopListener(services) + + dummyApiMan = &dummyApiManager{} + dummyDbMan = &dummyDbManager{} + dummyBundleMan = &dummyBundleManager{ + requestChan: make(chan *DownloadRequest), + } + testHandler = &apigeeSyncHandler{ + dbMan: dummyDbMan, + apiMan: dummyApiMan, + bundleMan: dummyBundleMan, + } + testHandler.initListener(services) + time.Sleep(100 * time.Millisecond) + }) + + var _ = AfterEach(func() { + testHandler.stopListener(services) + }) + Context("Snapshot", func() { + + It("Snapshot event shoud enqueue download requests for all unready blobs", func() { + // init unready blob ids + unreadyBlobIds := make([]string, 0) + blobMap := make(map[string]int) + for i := 0; i < rand.Intn(10); i++ { + id := GenerateUUID() + blobMap[id] = 1 + unreadyBlobIds = append(unreadyBlobIds, id) + } + dummyDbMan.unreadyBlobIds = unreadyBlobIds + + // emit snapshot + snapshot := &common.Snapshot{ + SnapshotInfo: fmt.Sprint(rand.Uint32()), + } + + apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + + for i := 0; i < len(unreadyBlobIds); i++ { + req := <-dummyBundleMan.requestChan + blobMap[req.blobId]++ + } + + // verify all unready blobids are enqueued + for _, val := range blobMap { + Expect(val).Should(Equal(2)) + } + }) + + It("Snapshot events shoud set db version, and should only init API endpoint once", func() { + + // emit snapshot + for i := 0; i < 2+rand.Intn(5); i++ { + version := fmt.Sprint(rand.Uint32()) + snapshot := &common.Snapshot{ + SnapshotInfo: version, + } + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + Expect(dummyDbMan.version).Should(Equal(version)) + } + + // verify init API called + Expect(dummyApiMan.initCalled).Should(BeTrue()) + }) + + }) + +}) + +type dummyBundleManager struct { + requestChan chan *DownloadRequest +} + +func (bm *dummyBundleManager) initializeBundleDownloading() { + +} + +func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) { + +} + +func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) { + bm.requestChan <- req +} + +func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest { + return &DownloadRequest{ + blobId: blobId, + } +} + +func (bm *dummyBundleManager) Close() { + +}