Merge pull request #28 from 30x/XAPID-656

fixed panic, added in-mem cache for scopes
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index afca88c..876ec9c 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -65,7 +65,7 @@
 	// Tests that entire bootstrap and set of sync operations work
 	var lastSnapshot *common.Snapshot
 
-	expectedSnapshotTables := make(map[string] bool)
+	expectedSnapshotTables := make(map[string]bool)
 	expectedSnapshotTables["kms.company"] = true
 	expectedSnapshotTables["edgex.apid_cluster"] = true
 	expectedSnapshotTables["edgex.data_scope"] = true
@@ -79,8 +79,8 @@
 			Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
 
 			/* After this, we will mock changes for tables not present in the initial snapshot
- 			* until that is changed in the mock server, we have to spoof the known tables
-			*/
+			* until that is changed in the mock server, we have to spoof the known tables
+			 */
 
 			//add apid_cluster and data_scope since those would present if this were a real scenario
 			knownTables["kms.app_credential"] = true
@@ -90,7 +90,6 @@
 			knownTables["kms.api_product"] = true
 			knownTables["kms.app"] = true
 
-
 			lastSnapshot = s
 
 			for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go
index b63255c..be3fc8c 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -86,7 +86,7 @@
 		log.Debug("polling...")
 
 		/* Find the scopes associated with the config id */
-		scopes := findScopesForId(apidInfo.ClusterID)
+		scopes := scopeCache.readAllScope()
 		v := url.Values{}
 
 		/* Sequence added to the query if available */
@@ -265,21 +265,42 @@
 
 	scopes := []string{apidInfo.ClusterID}
 	snapshot := downloadSnapshot(scopes)
+	storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot common.Snapshot) {
+	// note that for boot snapshot case, we don't touch databases. We only update in-mem cache
+	// This aims to deal with duplicate snapshot version#, see XAPID-869 for details
+	scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
+	for _, table := range snapshot.Tables {
+		if table.Name == LISTENER_TABLE_DATA_SCOPE {
+			for _, row := range table.Rows {
+				ds := makeDataScopeFromRow(row)
+				// cache scopes for this cluster
+				if ds.ClusterID == apidInfo.ClusterID {
+					scopeCache.updateCache(&ds)
+				}
+			}
+		}
+	}
 	// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
-	processSnapshot(&snapshot)
 }
 
 // use the scope IDs from the boot snapshot to get all the data associated with the scopes
 func downloadDataSnapshot() {
 	log.Debug("download Snapshot for data scopes")
 
-	var scopes = findScopesForId(apidInfo.ClusterID)
+	scopes := scopeCache.readAllScope()
+
 	scopes = append(scopes, apidInfo.ClusterID)
-	resp := downloadSnapshot(scopes)
+	snapshot := downloadSnapshot(scopes)
+	storeDataSnapshot(snapshot)
+}
 
-	knownTables = extractTablesFromSnapshot(resp)
+func storeDataSnapshot(snapshot common.Snapshot) {
+	knownTables = extractTablesFromSnapshot(snapshot)
 
-	db, err := data.DBVersion(resp.SnapshotInfo)
+	db, err := data.DBVersion(snapshot.SnapshotInfo)
 	if err != nil {
 		log.Panicf("Database inaccessible: %v", err)
 	}
@@ -287,7 +308,7 @@
 
 	done := make(chan bool)
 	log.Info("Emitting Snapshot to plugins")
-	events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) {
+	events.EmitWithCallback(ApigeeSyncEventSelector, &snapshot, func(event apid.Event) {
 		done <- true
 	})
 
@@ -296,6 +317,7 @@
 		log.Panic("Timeout. Plugins failed to respond to snapshot.")
 	case <-done:
 	}
+
 }
 
 func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
@@ -391,6 +413,7 @@
 	}
 
 	knownTables = extractTablesFromDB(db)
+	scopeCache.clearAndInitCache(snapshot)
 
 	// allow plugins (including this one) to start immediately on existing database
 	// Note: this MUST have no tables as that is used as an indicator
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index f5142d2..e9af4df 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -81,4 +81,14 @@
 		Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
 	})
 
+	/*
+	 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+	 */
+	It("Should be able to handle duplicate snapshot during bootstrap", func() {
+		scopes := []string{apidInfo.ClusterID}
+		snapshot := downloadSnapshot(scopes)
+		storeBootSnapshot(snapshot)
+		storeDataSnapshot(snapshot)
+	})
+
 })
diff --git a/datascope_cache.go b/datascope_cache.go
new file mode 100644
index 0000000..3d19710
--- /dev/null
+++ b/datascope_cache.go
@@ -0,0 +1,93 @@
+package apidApigeeSync
+
+const (
+	readCache int = iota
+	updateCache
+	removeCache
+	clearAndInit
+)
+
+/*
+ * structs for DatascopeCache
+ */
+
+type cacheOperationRequest struct {
+	Operation int
+	Scope     *dataDataScope
+	version   string
+}
+
+// maintain an in-mem cache of datascope
+type DatascopeCache struct {
+	requestChan  chan *cacheOperationRequest
+	readDoneChan chan []string
+	scopeMap     map[string]*dataDataScope
+	version      string
+}
+
+var scopeCache *DatascopeCache
+
+func (cache *DatascopeCache) datascopeCacheManager() {
+	for request := range cache.requestChan {
+		switch request.Operation {
+		case readCache:
+			log.Debug("datascopeCacheManager: readCache")
+			scopes := make([]string, 0, len(cache.scopeMap))
+			for _, ds := range cache.scopeMap {
+				scopes = append(scopes, ds.Scope)
+			}
+			cache.readDoneChan <- scopes
+		case updateCache:
+			log.Debug("datascopeCacheManager: updateCache")
+			cache.scopeMap[request.Scope.ID] = request.Scope
+		case removeCache:
+			log.Debug("datascopeCacheManager: removeCache")
+			delete(cache.scopeMap, request.Scope.ID)
+		case clearAndInit:
+			log.Debug("datascopeCacheManager: clearAndInit")
+			if cache.version != request.version {
+				cache.scopeMap = make(map[string]*dataDataScope)
+				cache.version = request.version
+			}
+		}
+	}
+
+	//chan closed
+	cache.scopeMap = nil
+	close(cache.readDoneChan)
+}
+
+/*
+ * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID)
+ */
+
+func (cache *DatascopeCache) readAllScope() []string {
+	cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
+	scopes := <-cache.readDoneChan
+	// eliminate duplicates
+	tmpMap := make(map[string]bool)
+	for _, scope := range scopes {
+		tmpMap[scope] = true
+	}
+	scopes = make([]string, 0)
+	for scope := range tmpMap {
+		scopes = append(scopes, scope)
+	}
+	return scopes
+}
+
+func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
+	cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
+}
+
+func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
+	cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
+}
+
+func (cache *DatascopeCache) clearAndInitCache(version string) {
+	cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
+}
+
+func (cache *DatascopeCache) closeCache() {
+	close(cache.requestChan)
+}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go
new file mode 100644
index 0000000..bc67f53
--- /dev/null
+++ b/datascope_cache_test.go
@@ -0,0 +1,97 @@
+package apidApigeeSync
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"math/rand"
+	"strconv"
+	"time"
+)
+
+var _ = Describe("datascope cache", func() {
+	/*
+	 * in-mem cache test
+	 */
+	It("Test In-mem cache", func() {
+		testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
+		go testCache.datascopeCacheManager()
+		testCache.clearAndInitCache("test-version")
+		countChan := make(chan int)
+		base := 10
+		rand.Seed(time.Now().Unix())
+		num := base + rand.Intn(base)
+		scopeMap := make(map[string]bool)
+		// async update
+		for i := 0; i < num; i++ {
+			id := strconv.Itoa(i)
+			scopeStr := strconv.Itoa(i % base)
+			scope := &dataDataScope{ID: id, Scope: scopeStr}
+			scopeMap[scope.Scope] = true
+			go func(scope *dataDataScope) {
+				testCache.updateCache(scope)
+				countChan <- 1
+			}(scope)
+		}
+
+		// wait until update done
+		for i := 0; i < num; i++ {
+			<-countChan
+		}
+
+		// verify update
+		retrievedScopes := testCache.readAllScope()
+		Expect(len(scopeMap)).To(Equal(len(retrievedScopes)))
+		for _, s := range retrievedScopes {
+			// verify each retrieved scope is valid
+			Expect(scopeMap[s]).To(BeTrue())
+			// no duplicate scopes
+			scopeMap[s] = true
+		}
+
+		// remove all the datascopes with odd scope
+		count := 0
+		for i := 0; i < num; i++ {
+			if (i%base)%2 == 1 {
+				count += 1
+				id := strconv.Itoa(i)
+				scopeStr := strconv.Itoa(i % base)
+				scope := &dataDataScope{ID: id, Scope: scopeStr}
+				go func(scope *dataDataScope) {
+					testCache.removeCache(scope)
+					countChan <- 1
+				}(scope)
+			}
+		}
+
+		for i := 0; i < count; i++ {
+			<-countChan
+		}
+
+		// all retrieved scopes should be even
+		retrievedScopes = testCache.readAllScope()
+		for _, s := range retrievedScopes {
+			scopeNum, _ := strconv.Atoi(s)
+			Expect(scopeNum % 2).To(BeZero())
+		}
+
+		// async remove all datascopes
+		for i := 0; i < num; i++ {
+			id := strconv.Itoa(i)
+			scopeStr := strconv.Itoa(i % base)
+			scope := &dataDataScope{ID: id, Scope: scopeStr}
+			go func(scope *dataDataScope) {
+				testCache.removeCache(scope)
+				countChan <- 1
+			}(scope)
+		}
+
+		for i := 0; i < num; i++ {
+			<-countChan
+		}
+		retrievedScopes = testCache.readAllScope()
+		Expect(len(retrievedScopes)).To(Equal(0))
+
+		testCache.closeCache()
+	})
+
+})
diff --git a/init.go b/init.go
index 5e2b840..d40a9e7 100644
--- a/init.go
+++ b/init.go
@@ -78,6 +78,10 @@
 	data = services.Data()
 	events = services.Events()
 
+	scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
+
+	go scopeCache.datascopeCacheManager()
+
 	/* This callback function will get called, once all the plugins are
 	 * initialized (not just this plugin). This is needed because,
 	 * downloadSnapshots/changes etc have to begin to be processed only
diff --git a/listener.go b/listener.go
index 59b358a..1fbd82e 100644
--- a/listener.go
+++ b/listener.go
@@ -42,6 +42,9 @@
 		log.Panicf("Unable to initialize database: %v", err)
 	}
 
+	// clear cache
+	scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
+
 	tx, err := db.Begin()
 	if err != nil {
 		log.Panicf("Error starting transaction: %v", err)
@@ -70,6 +73,10 @@
 				if err != nil {
 					log.Panicf("Snapshot update failed: %v", err)
 				}
+				// cache scopes for this cluster
+				if ds.ClusterID == apidInfo.ClusterID {
+					scopeCache.updateCache(&ds)
+				}
 			}
 		}
 	}
@@ -114,9 +121,19 @@
 			case common.Insert:
 				ds := makeDataScopeFromRow(change.NewRow)
 				err = insertDataScope(ds, tx)
+
+				// cache scopes for this cluster
+				if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
+					scopeCache.updateCache(&ds)
+				}
 			case common.Delete:
 				ds := makeDataScopeFromRow(change.OldRow)
-				deleteDataScope(ds, tx)
+				err = deleteDataScope(ds, tx)
+
+				// cache scopes for this cluster
+				if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
+					scopeCache.removeCache(&ds)
+				}
 			default:
 				// common.Update is not allowed
 				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)