fixed XAPID-1020, process snapshot/changelist before emit event
diff --git a/apigee_sync.go b/apigee_sync.go
index 72185ff..10745de 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -39,7 +39,7 @@
if apidInfo.LastSnapshot != "" {
snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
-
+ processSnapshot(snapshot)
events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
apidChangeManager.pollChangeWithBackoff()
})
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 94fa16e..d81ec32 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -306,7 +306,7 @@
apidTokenManager = createSimpleTokenManager()
apidTokenManager.start()
apidSnapshotManager = createSnapShotManager()
- events.Listen(ApigeeSyncEventSelector, &handler{})
+ //events.Listen(ApigeeSyncEventSelector, &handler{})
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
diff --git a/change_test.go b/change_test.go
index 09ef3dd..4abea60 100644
--- a/change_test.go
+++ b/change_test.go
@@ -27,7 +27,6 @@
var _ = Describe("Change Agent", func() {
Context("Change Agent Unit Tests", func() {
- handler := handler{}
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb_change.sqlite3")
@@ -46,7 +45,7 @@
BeforeEach(func() {
event := createTestDb("./sql/init_mock_db.sql", "test_change")
- handler.Handle(&event)
+ processSnapshot(&event)
knownTables = extractTablesFromDB(getDB())
})
diff --git a/changes.go b/changes.go
index 4ea5920..987c307 100644
--- a/changes.go
+++ b/changes.go
@@ -252,6 +252,7 @@
/* If valid data present, Emit to plugins */
if len(resp.Changes) > 0 {
+ processChangeList(&resp)
select {
case <-time.After(httpTimeout):
log.Panic("Timeout. Plugins failed to respond to changes.")
diff --git a/init.go b/init.go
index b2f87f8..245d4af 100644
--- a/init.go
+++ b/init.go
@@ -228,7 +228,6 @@
apidTokenManager.start()
go bootstrap()
- events.Listen(ApigeeSyncEventSelector, &handler{})
log.Debug("Done post plugin init")
}
}
diff --git a/listener.go b/listener.go
index 541b793..9c002f2 100644
--- a/listener.go
+++ b/listener.go
@@ -24,24 +24,6 @@
LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
-type handler struct {
-}
-
-func (h *handler) String() string {
- return "ApigeeSync"
-}
-
-func (h *handler) Handle(e apid.Event) {
-
- if changeSet, ok := e.(*common.ChangeList); ok {
- processChangeList(changeSet)
- } else if snapShot, ok := e.(*common.Snapshot); ok {
- processSnapshot(snapShot)
- } else {
- log.Debugf("Received invalid event. Ignoring. %v", e)
- }
-}
-
func processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
diff --git a/listener_test.go b/listener_test.go
index bf5bef3..afb86b3 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -24,7 +24,7 @@
var _ = Describe("listener", func() {
- handler := handler{}
+
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb.sqlite3")
@@ -41,7 +41,7 @@
It("should fail if more than one apid_cluster rows", func() {
event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { processSnapshot(&event) }).To(Panic())
}, 3)
It("should fail if more than one apid_cluster rows", func() {
@@ -64,7 +64,7 @@
event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
- handler.Handle(&event)
+ processSnapshot(&event)
info, err := getApidInstanceInfo()
Expect(err).NotTo(HaveOccurred())
@@ -160,7 +160,7 @@
It("insert event should panic", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
//save the last snapshot, so we can restore it at the end of this context
@@ -174,12 +174,12 @@
},
}
- Expect(func() { handler.Handle(&csEvent) }).To(Panic())
+ Expect(func(){processChangeList(&csEvent)}).To(Panic())
}, 3)
It("update event should panic", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -191,7 +191,7 @@
},
}
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func(){processChangeList(&event)}).To(Panic())
//restore the last snapshot
}, 3)
@@ -201,7 +201,7 @@
It("insert event should add", func() {
ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -241,7 +241,7 @@
},
}
- handler.Handle(&event)
+ processChangeList(&event)
var dds []dataDataScope
@@ -286,7 +286,7 @@
It("delete event should delete", func() {
ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
insert := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -309,7 +309,7 @@
},
}
- handler.Handle(&insert)
+ processChangeList(&insert)
delete := common.ChangeList{
LastSequence: "test",
@@ -322,7 +322,7 @@
},
}
- handler.Handle(&delete)
+ processChangeList(&delete)
var nRows int
err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
@@ -333,7 +333,7 @@
It("update event should panic for data scopes table", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -345,7 +345,7 @@
},
}
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { processChangeList(&event) }).To(Panic())
//restore the last snapshot
}, 3)
diff --git a/snapshot.go b/snapshot.go
index b7a089d..960b4fc 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -154,6 +154,7 @@
log.Panicf("Database inaccessible: %v", err)
}
+ processSnapshot(snapshot)
log.Info("Emitting Snapshot to plugins")
select {