Merge pull request #50 from 30x/XAPID-1020
[XAPID-1020] fix race condition in snapshot/changes events
diff --git a/apigee_sync.go b/apigee_sync.go
index 72185ff..10745de 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -39,7 +39,7 @@
if apidInfo.LastSnapshot != "" {
snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
-
+ processSnapshot(snapshot)
events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
apidChangeManager.pollChangeWithBackoff()
})
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 94fa16e..d81ec32 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -306,7 +306,7 @@
apidTokenManager = createSimpleTokenManager()
apidTokenManager.start()
apidSnapshotManager = createSnapShotManager()
- events.Listen(ApigeeSyncEventSelector, &handler{})
+ //events.Listen(ApigeeSyncEventSelector, &handler{})
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
diff --git a/change_test.go b/change_test.go
index 09ef3dd..383bff6 100644
--- a/change_test.go
+++ b/change_test.go
@@ -27,7 +27,6 @@
var _ = Describe("Change Agent", func() {
Context("Change Agent Unit Tests", func() {
- handler := handler{}
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb_change.sqlite3")
@@ -46,7 +45,7 @@
BeforeEach(func() {
event := createTestDb("./sql/init_mock_db.sql", "test_change")
- handler.Handle(&event)
+ processSnapshot(&event)
knownTables = extractTablesFromDB(getDB())
})
@@ -81,6 +80,10 @@
config.Set(configPollInterval, 10*time.Millisecond)
}
+ AfterEach(func() {
+ restoreContext()
+ })
+
It("test change agent with authorization failure", func() {
log.Debug("test change agent with authorization failure")
testTokenManager := &dummyTokenManager{make(chan bool)}
@@ -95,7 +98,6 @@
<-testTokenManager.invalidateChan
log.Debug("closing")
<-apidChangeManager.close()
- restoreContext()
})
It("test change agent with too old snapshot", func() {
@@ -114,7 +116,6 @@
<-testSnapshotManager.downloadCalledChan
log.Debug("closing")
<-apidChangeManager.close()
- restoreContext()
})
It("change agent should retry with authorization failure", func(done Done) {
@@ -136,7 +137,6 @@
go func() {
// when close done, all handlers for the first snapshot have been executed
<-closeDone
- restoreContext()
close(done)
}()
@@ -146,7 +146,7 @@
apidChangeManager.pollChangeWithBackoff()
// auth check fails
<-testTokenManager.invalidateChan
- })
+ }, 2)
})
})
diff --git a/changes.go b/changes.go
index 4ea5920..993613f 100644
--- a/changes.go
+++ b/changes.go
@@ -98,8 +98,8 @@
return
}
- go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff")
+ go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
}
@@ -252,6 +252,7 @@
/* If valid data present, Emit to plugins */
if len(resp.Changes) > 0 {
+ processChangeList(&resp)
select {
case <-time.After(httpTimeout):
log.Panic("Timeout. Plugins failed to respond to changes.")
diff --git a/init.go b/init.go
index b2f87f8..245d4af 100644
--- a/init.go
+++ b/init.go
@@ -228,7 +228,6 @@
apidTokenManager.start()
go bootstrap()
- events.Listen(ApigeeSyncEventSelector, &handler{})
log.Debug("Done post plugin init")
}
}
diff --git a/listener.go b/listener.go
index 541b793..9c002f2 100644
--- a/listener.go
+++ b/listener.go
@@ -24,24 +24,6 @@
LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
-type handler struct {
-}
-
-func (h *handler) String() string {
- return "ApigeeSync"
-}
-
-func (h *handler) Handle(e apid.Event) {
-
- if changeSet, ok := e.(*common.ChangeList); ok {
- processChangeList(changeSet)
- } else if snapShot, ok := e.(*common.Snapshot); ok {
- processSnapshot(snapShot)
- } else {
- log.Debugf("Received invalid event. Ignoring. %v", e)
- }
-}
-
func processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
diff --git a/listener_test.go b/listener_test.go
index bf5bef3..218cc0a 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -24,8 +24,6 @@
var _ = Describe("listener", func() {
- handler := handler{}
-
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb.sqlite3")
file, err := os.Open("./mockdb.sqlite3")
@@ -41,7 +39,7 @@
It("should fail if more than one apid_cluster rows", func() {
event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { processSnapshot(&event) }).To(Panic())
}, 3)
It("should fail if more than one apid_cluster rows", func() {
@@ -64,7 +62,7 @@
event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
- handler.Handle(&event)
+ processSnapshot(&event)
info, err := getApidInstanceInfo()
Expect(err).NotTo(HaveOccurred())
@@ -160,7 +158,7 @@
It("insert event should panic", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
//save the last snapshot, so we can restore it at the end of this context
@@ -174,12 +172,12 @@
},
}
- Expect(func() { handler.Handle(&csEvent) }).To(Panic())
+ Expect(func() { processChangeList(&csEvent) }).To(Panic())
}, 3)
It("update event should panic", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -191,7 +189,7 @@
},
}
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { processChangeList(&event) }).To(Panic())
//restore the last snapshot
}, 3)
@@ -201,7 +199,7 @@
It("insert event should add", func() {
ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -241,7 +239,7 @@
},
}
- handler.Handle(&event)
+ processChangeList(&event)
var dds []dataDataScope
@@ -286,7 +284,7 @@
It("delete event should delete", func() {
ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
insert := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -309,7 +307,7 @@
},
}
- handler.Handle(&insert)
+ processChangeList(&insert)
delete := common.ChangeList{
LastSequence: "test",
@@ -322,7 +320,7 @@
},
}
- handler.Handle(&delete)
+ processChangeList(&delete)
var nRows int
err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
@@ -333,7 +331,7 @@
It("update event should panic for data scopes table", func() {
ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
- handler.Handle(&ssEvent)
+ processSnapshot(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -345,7 +343,7 @@
},
}
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { processChangeList(&event) }).To(Panic())
//restore the last snapshot
}, 3)
diff --git a/snapshot.go b/snapshot.go
index b7a089d..960b4fc 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -154,6 +154,7 @@
log.Panicf("Database inaccessible: %v", err)
}
+ processSnapshot(snapshot)
log.Info("Emitting Snapshot to plugins")
select {