fixed XAPID-1020, process snapshot/changelist before emit event
diff --git a/apigee_sync.go b/apigee_sync.go index 72185ff..10745de 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -39,7 +39,7 @@ if apidInfo.LastSnapshot != "" { snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) - + processSnapshot(snapshot) events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { apidChangeManager.pollChangeWithBackoff() })
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 94fa16e..d81ec32 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -306,7 +306,7 @@ apidTokenManager = createSimpleTokenManager() apidTokenManager.start() apidSnapshotManager = createSnapShotManager() - events.Listen(ApigeeSyncEventSelector, &handler{}) + //events.Listen(ApigeeSyncEventSelector, &handler{}) scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{}
diff --git a/change_test.go b/change_test.go index 09ef3dd..4abea60 100644 --- a/change_test.go +++ b/change_test.go
@@ -27,7 +27,6 @@ var _ = Describe("Change Agent", func() { Context("Change Agent Unit Tests", func() { - handler := handler{} var createTestDb = func(sqlfile string, dbId string) common.Snapshot { initDb(sqlfile, "./mockdb_change.sqlite3") @@ -46,7 +45,7 @@ BeforeEach(func() { event := createTestDb("./sql/init_mock_db.sql", "test_change") - handler.Handle(&event) + processSnapshot(&event) knownTables = extractTablesFromDB(getDB()) })
diff --git a/changes.go b/changes.go index 4ea5920..987c307 100644 --- a/changes.go +++ b/changes.go
@@ -252,6 +252,7 @@ /* If valid data present, Emit to plugins */ if len(resp.Changes) > 0 { + processChangeList(&resp) select { case <-time.After(httpTimeout): log.Panic("Timeout. Plugins failed to respond to changes.")
diff --git a/init.go b/init.go index b2f87f8..245d4af 100644 --- a/init.go +++ b/init.go
@@ -228,7 +228,6 @@ apidTokenManager.start() go bootstrap() - events.Listen(ApigeeSyncEventSelector, &handler{}) log.Debug("Done post plugin init") } }
diff --git a/listener.go b/listener.go index 541b793..9c002f2 100644 --- a/listener.go +++ b/listener.go
@@ -24,24 +24,6 @@ LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope" ) -type handler struct { -} - -func (h *handler) String() string { - return "ApigeeSync" -} - -func (h *handler) Handle(e apid.Event) { - - if changeSet, ok := e.(*common.ChangeList); ok { - processChangeList(changeSet) - } else if snapShot, ok := e.(*common.Snapshot); ok { - processSnapshot(snapShot) - } else { - log.Debugf("Received invalid event. Ignoring. %v", e) - } -} - func processSnapshot(snapshot *common.Snapshot) { log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
diff --git a/listener_test.go b/listener_test.go index bf5bef3..afb86b3 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -24,7 +24,7 @@ var _ = Describe("listener", func() { - handler := handler{} + var createTestDb = func(sqlfile string, dbId string) common.Snapshot { initDb(sqlfile, "./mockdb.sqlite3") @@ -41,7 +41,7 @@ It("should fail if more than one apid_cluster rows", func() { event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters") - Expect(func() { handler.Handle(&event) }).To(Panic()) + Expect(func() { processSnapshot(&event) }).To(Panic()) }, 3) It("should fail if more than one apid_cluster rows", func() { @@ -64,7 +64,7 @@ event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid") - handler.Handle(&event) + processSnapshot(&event) info, err := getApidInstanceInfo() Expect(err).NotTo(HaveOccurred()) @@ -160,7 +160,7 @@ It("insert event should panic", func() { ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic") - handler.Handle(&ssEvent) + processSnapshot(&ssEvent) //save the last snapshot, so we can restore it at the end of this context @@ -174,12 +174,12 @@ }, } - Expect(func() { handler.Handle(&csEvent) }).To(Panic()) + Expect(func(){processChangeList(&csEvent)}).To(Panic()) }, 3) It("update event should panic", func() { ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic") - handler.Handle(&ssEvent) + processSnapshot(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -191,7 +191,7 @@ }, } - Expect(func() { handler.Handle(&event) }).To(Panic()) + Expect(func(){processChangeList(&event)}).To(Panic()) //restore the last snapshot }, 3) @@ -201,7 +201,7 @@ It("insert event should add", func() { ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert") - handler.Handle(&ssEvent) + processSnapshot(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -241,7 +241,7 @@ }, } - handler.Handle(&event) + processChangeList(&event) var dds []dataDataScope @@ -286,7 +286,7 @@ It("delete event should delete", func() { ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete") - handler.Handle(&ssEvent) + processSnapshot(&ssEvent) insert := common.ChangeList{ LastSequence: "test", Changes: []common.Change{ @@ -309,7 +309,7 @@ }, } - handler.Handle(&insert) + processChangeList(&insert) delete := common.ChangeList{ LastSequence: "test", @@ -322,7 +322,7 @@ }, } - handler.Handle(&delete) + processChangeList(&delete) var nRows int err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows) @@ -333,7 +333,7 @@ It("update event should panic for data scopes table", func() { ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic") - handler.Handle(&ssEvent) + processSnapshot(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -345,7 +345,7 @@ }, } - Expect(func() { handler.Handle(&event) }).To(Panic()) + Expect(func() { processChangeList(&event) }).To(Panic()) //restore the last snapshot }, 3)
diff --git a/snapshot.go b/snapshot.go index b7a089d..960b4fc 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -154,6 +154,7 @@ log.Panicf("Database inaccessible: %v", err) } + processSnapshot(snapshot) log.Info("Emitting Snapshot to plugins") select {