Merge pull request #50 from 30x/XAPID-1020

[XAPID-1020] fix race condition in snapshot/changes events
diff --git a/apigee_sync.go b/apigee_sync.go
index 72185ff..10745de 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -39,7 +39,7 @@
 
 	if apidInfo.LastSnapshot != "" {
 		snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
-
+		processSnapshot(snapshot)
 		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
 			apidChangeManager.pollChangeWithBackoff()
 		})
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 94fa16e..d81ec32 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -306,7 +306,7 @@
 			apidTokenManager = createSimpleTokenManager()
 			apidTokenManager.start()
 			apidSnapshotManager = createSnapShotManager()
-			events.Listen(ApigeeSyncEventSelector, &handler{})
+			//events.Listen(ApigeeSyncEventSelector, &handler{})
 
 			scopes := []string{apidInfo.ClusterID}
 			snapshot := &common.Snapshot{}
diff --git a/change_test.go b/change_test.go
index 09ef3dd..383bff6 100644
--- a/change_test.go
+++ b/change_test.go
@@ -27,7 +27,6 @@
 var _ = Describe("Change Agent", func() {
 
 	Context("Change Agent Unit Tests", func() {
-		handler := handler{}
 
 		var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
 			initDb(sqlfile, "./mockdb_change.sqlite3")
@@ -46,7 +45,7 @@
 
 		BeforeEach(func() {
 			event := createTestDb("./sql/init_mock_db.sql", "test_change")
-			handler.Handle(&event)
+			processSnapshot(&event)
 			knownTables = extractTablesFromDB(getDB())
 		})
 
@@ -81,6 +80,10 @@
 			config.Set(configPollInterval, 10*time.Millisecond)
 		}
 
+		AfterEach(func() {
+			restoreContext()
+		})
+
 		It("test change agent with authorization failure", func() {
 			log.Debug("test change agent with authorization failure")
 			testTokenManager := &dummyTokenManager{make(chan bool)}
@@ -95,7 +98,6 @@
 			<-testTokenManager.invalidateChan
 			log.Debug("closing")
 			<-apidChangeManager.close()
-			restoreContext()
 		})
 
 		It("test change agent with too old snapshot", func() {
@@ -114,7 +116,6 @@
 			<-testSnapshotManager.downloadCalledChan
 			log.Debug("closing")
 			<-apidChangeManager.close()
-			restoreContext()
 		})
 
 		It("change agent should retry with authorization failure", func(done Done) {
@@ -136,7 +137,6 @@
 					go func() {
 						// when close done, all handlers for the first snapshot have been executed
 						<-closeDone
-						restoreContext()
 						close(done)
 					}()
 
@@ -146,7 +146,7 @@
 			apidChangeManager.pollChangeWithBackoff()
 			// auth check fails
 			<-testTokenManager.invalidateChan
-		})
+		}, 2)
 
 	})
 })
diff --git a/changes.go b/changes.go
index 4ea5920..993613f 100644
--- a/changes.go
+++ b/changes.go
@@ -98,8 +98,8 @@
 		return
 	}
 
-	go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
 	log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff")
+	go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
 
 }
 
@@ -252,6 +252,7 @@
 
 	/* If valid data present, Emit to plugins */
 	if len(resp.Changes) > 0 {
+		processChangeList(&resp)
 		select {
 		case <-time.After(httpTimeout):
 			log.Panic("Timeout. Plugins failed to respond to changes.")
diff --git a/init.go b/init.go
index b2f87f8..245d4af 100644
--- a/init.go
+++ b/init.go
@@ -228,7 +228,6 @@
 		apidTokenManager.start()
 		go bootstrap()
 
-		events.Listen(ApigeeSyncEventSelector, &handler{})
 		log.Debug("Done post plugin init")
 	}
 }
diff --git a/listener.go b/listener.go
index 541b793..9c002f2 100644
--- a/listener.go
+++ b/listener.go
@@ -24,24 +24,6 @@
 	LISTENER_TABLE_DATA_SCOPE   = "edgex.data_scope"
 )
 
-type handler struct {
-}
-
-func (h *handler) String() string {
-	return "ApigeeSync"
-}
-
-func (h *handler) Handle(e apid.Event) {
-
-	if changeSet, ok := e.(*common.ChangeList); ok {
-		processChangeList(changeSet)
-	} else if snapShot, ok := e.(*common.Snapshot); ok {
-		processSnapshot(snapShot)
-	} else {
-		log.Debugf("Received invalid event. Ignoring. %v", e)
-	}
-}
-
 func processSnapshot(snapshot *common.Snapshot) {
 	log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
 
diff --git a/listener_test.go b/listener_test.go
index bf5bef3..218cc0a 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -24,8 +24,6 @@
 
 var _ = Describe("listener", func() {
 
-	handler := handler{}
-
 	var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
 		initDb(sqlfile, "./mockdb.sqlite3")
 		file, err := os.Open("./mockdb.sqlite3")
@@ -41,7 +39,7 @@
 
 		It("should fail if more than one apid_cluster rows", func() {
 			event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
-			Expect(func() { handler.Handle(&event) }).To(Panic())
+			Expect(func() { processSnapshot(&event) }).To(Panic())
 		}, 3)
 
 		It("should fail if more than one apid_cluster rows", func() {
@@ -64,7 +62,7 @@
 
 			event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
 
-			handler.Handle(&event)
+			processSnapshot(&event)
 
 			info, err := getApidInstanceInfo()
 			Expect(err).NotTo(HaveOccurred())
@@ -160,7 +158,7 @@
 
 			It("insert event should panic", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
-				handler.Handle(&ssEvent)
+				processSnapshot(&ssEvent)
 
 				//save the last snapshot, so we can restore it at the end of this context
 
@@ -174,12 +172,12 @@
 					},
 				}
 
-				Expect(func() { handler.Handle(&csEvent) }).To(Panic())
+				Expect(func() { processChangeList(&csEvent) }).To(Panic())
 			}, 3)
 
 			It("update event should panic", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
-				handler.Handle(&ssEvent)
+				processSnapshot(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -191,7 +189,7 @@
 					},
 				}
 
-				Expect(func() { handler.Handle(&event) }).To(Panic())
+				Expect(func() { processChangeList(&event) }).To(Panic())
 				//restore the last snapshot
 			}, 3)
 
@@ -201,7 +199,7 @@
 
 			It("insert event should add", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
-				handler.Handle(&ssEvent)
+				processSnapshot(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -241,7 +239,7 @@
 					},
 				}
 
-				handler.Handle(&event)
+				processChangeList(&event)
 
 				var dds []dataDataScope
 
@@ -286,7 +284,7 @@
 
 			It("delete event should delete", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
-				handler.Handle(&ssEvent)
+				processSnapshot(&ssEvent)
 				insert := common.ChangeList{
 					LastSequence: "test",
 					Changes: []common.Change{
@@ -309,7 +307,7 @@
 					},
 				}
 
-				handler.Handle(&insert)
+				processChangeList(&insert)
 
 				delete := common.ChangeList{
 					LastSequence: "test",
@@ -322,7 +320,7 @@
 					},
 				}
 
-				handler.Handle(&delete)
+				processChangeList(&delete)
 
 				var nRows int
 				err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
@@ -333,7 +331,7 @@
 
 			It("update event should panic for data scopes table", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
-				handler.Handle(&ssEvent)
+				processSnapshot(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -345,7 +343,7 @@
 					},
 				}
 
-				Expect(func() { handler.Handle(&event) }).To(Panic())
+				Expect(func() { processChangeList(&event) }).To(Panic())
 				//restore the last snapshot
 			}, 3)
 
diff --git a/snapshot.go b/snapshot.go
index b7a089d..960b4fc 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -154,6 +154,7 @@
 		log.Panicf("Database inaccessible: %v", err)
 	}
 
+	processSnapshot(snapshot)
 	log.Info("Emitting Snapshot to plugins")
 
 	select {