Merge pull request #28 from 30x/XAPID-656
fixed panic, added in-mem cache for scopes
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index afca88c..876ec9c 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -65,7 +65,7 @@
// Tests that entire bootstrap and set of sync operations work
var lastSnapshot *common.Snapshot
- expectedSnapshotTables := make(map[string] bool)
+ expectedSnapshotTables := make(map[string]bool)
expectedSnapshotTables["kms.company"] = true
expectedSnapshotTables["edgex.apid_cluster"] = true
expectedSnapshotTables["edgex.data_scope"] = true
@@ -79,8 +79,8 @@
Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
/* After this, we will mock changes for tables not present in the initial snapshot
- * until that is changed in the mock server, we have to spoof the known tables
- */
+ * until that is changed in the mock server, we have to spoof the known tables
+ */
//add apid_cluster and data_scope since those would present if this were a real scenario
knownTables["kms.app_credential"] = true
@@ -90,7 +90,6 @@
knownTables["kms.api_product"] = true
knownTables["kms.app"] = true
-
lastSnapshot = s
for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go
index b63255c..be3fc8c 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -86,7 +86,7 @@
log.Debug("polling...")
/* Find the scopes associated with the config id */
- scopes := findScopesForId(apidInfo.ClusterID)
+ scopes := scopeCache.readAllScope()
v := url.Values{}
/* Sequence added to the query if available */
@@ -265,21 +265,42 @@
scopes := []string{apidInfo.ClusterID}
snapshot := downloadSnapshot(scopes)
+ storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot common.Snapshot) {
+ // note that for boot snapshot case, we don't touch databases. We only update in-mem cache
+ // This aims to deal with duplicate snapshot version#, see XAPID-869 for details
+ scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
+ for _, table := range snapshot.Tables {
+ if table.Name == LISTENER_TABLE_DATA_SCOPE {
+ for _, row := range table.Rows {
+ ds := makeDataScopeFromRow(row)
+ // cache scopes for this cluster
+ if ds.ClusterID == apidInfo.ClusterID {
+ scopeCache.updateCache(&ds)
+ }
+ }
+ }
+ }
// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
- processSnapshot(&snapshot)
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
func downloadDataSnapshot() {
log.Debug("download Snapshot for data scopes")
- var scopes = findScopesForId(apidInfo.ClusterID)
+ scopes := scopeCache.readAllScope()
+
scopes = append(scopes, apidInfo.ClusterID)
- resp := downloadSnapshot(scopes)
+ snapshot := downloadSnapshot(scopes)
+ storeDataSnapshot(snapshot)
+}
- knownTables = extractTablesFromSnapshot(resp)
+func storeDataSnapshot(snapshot common.Snapshot) {
+ knownTables = extractTablesFromSnapshot(snapshot)
- db, err := data.DBVersion(resp.SnapshotInfo)
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
@@ -287,7 +308,7 @@
done := make(chan bool)
log.Info("Emitting Snapshot to plugins")
- events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) {
+ events.EmitWithCallback(ApigeeSyncEventSelector, &snapshot, func(event apid.Event) {
done <- true
})
@@ -296,6 +317,7 @@
log.Panic("Timeout. Plugins failed to respond to snapshot.")
case <-done:
}
+
}
func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
@@ -391,6 +413,7 @@
}
knownTables = extractTablesFromDB(db)
+ scopeCache.clearAndInitCache(snapshot)
// allow plugins (including this one) to start immediately on existing database
// Note: this MUST have no tables as that is used as an indicator
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index f5142d2..e9af4df 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -81,4 +81,14 @@
Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
})
+ /*
+ * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+ */
+ It("Should be able to handle duplicate snapshot during bootstrap", func() {
+ scopes := []string{apidInfo.ClusterID}
+ snapshot := downloadSnapshot(scopes)
+ storeBootSnapshot(snapshot)
+ storeDataSnapshot(snapshot)
+ })
+
})
diff --git a/datascope_cache.go b/datascope_cache.go
new file mode 100644
index 0000000..3d19710
--- /dev/null
+++ b/datascope_cache.go
@@ -0,0 +1,93 @@
+package apidApigeeSync
+
+const (
+ readCache int = iota
+ updateCache
+ removeCache
+ clearAndInit
+)
+
+/*
+ * structs for DatascopeCache
+ */
+
+type cacheOperationRequest struct {
+ Operation int
+ Scope *dataDataScope
+ version string
+}
+
+// maintain an in-mem cache of datascope
+type DatascopeCache struct {
+ requestChan chan *cacheOperationRequest
+ readDoneChan chan []string
+ scopeMap map[string]*dataDataScope
+ version string
+}
+
+var scopeCache *DatascopeCache
+
+func (cache *DatascopeCache) datascopeCacheManager() {
+ for request := range cache.requestChan {
+ switch request.Operation {
+ case readCache:
+ log.Debug("datascopeCacheManager: readCache")
+ scopes := make([]string, 0, len(cache.scopeMap))
+ for _, ds := range cache.scopeMap {
+ scopes = append(scopes, ds.Scope)
+ }
+ cache.readDoneChan <- scopes
+ case updateCache:
+ log.Debug("datascopeCacheManager: updateCache")
+ cache.scopeMap[request.Scope.ID] = request.Scope
+ case removeCache:
+ log.Debug("datascopeCacheManager: removeCache")
+ delete(cache.scopeMap, request.Scope.ID)
+ case clearAndInit:
+ log.Debug("datascopeCacheManager: clearAndInit")
+ if cache.version != request.version {
+ cache.scopeMap = make(map[string]*dataDataScope)
+ cache.version = request.version
+ }
+ }
+ }
+
+ //chan closed
+ cache.scopeMap = nil
+ close(cache.readDoneChan)
+}
+
+/*
+ * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID)
+ */
+
+func (cache *DatascopeCache) readAllScope() []string {
+ cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
+ scopes := <-cache.readDoneChan
+ // eliminate duplicates
+ tmpMap := make(map[string]bool)
+ for _, scope := range scopes {
+ tmpMap[scope] = true
+ }
+ scopes = make([]string, 0)
+ for scope := range tmpMap {
+ scopes = append(scopes, scope)
+ }
+ return scopes
+}
+
+func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
+ cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
+}
+
+func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
+ cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
+}
+
+func (cache *DatascopeCache) clearAndInitCache(version string) {
+ cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
+}
+
+func (cache *DatascopeCache) closeCache() {
+ close(cache.requestChan)
+}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go
new file mode 100644
index 0000000..bc67f53
--- /dev/null
+++ b/datascope_cache_test.go
@@ -0,0 +1,97 @@
+package apidApigeeSync
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "math/rand"
+ "strconv"
+ "time"
+)
+
+var _ = Describe("datascope cache", func() {
+ /*
+ * in-mem cache test
+ */
+ It("Test In-mem cache", func() {
+ testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
+ go testCache.datascopeCacheManager()
+ testCache.clearAndInitCache("test-version")
+ countChan := make(chan int)
+ base := 10
+ rand.Seed(time.Now().Unix())
+ num := base + rand.Intn(base)
+ scopeMap := make(map[string]bool)
+ // async update
+ for i := 0; i < num; i++ {
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ scopeMap[scope.Scope] = true
+ go func(scope *dataDataScope) {
+ testCache.updateCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+
+ // wait until update done
+ for i := 0; i < num; i++ {
+ <-countChan
+ }
+
+ // verify update
+ retrievedScopes := testCache.readAllScope()
+ Expect(len(scopeMap)).To(Equal(len(retrievedScopes)))
+ for _, s := range retrievedScopes {
+ // verify each retrieved scope is valid
+ Expect(scopeMap[s]).To(BeTrue())
+ // no duplicate scopes
+ scopeMap[s] = true
+ }
+
+ // remove all the datascopes with odd scope
+ count := 0
+ for i := 0; i < num; i++ {
+ if (i%base)%2 == 1 {
+ count += 1
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ go func(scope *dataDataScope) {
+ testCache.removeCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+ }
+
+ for i := 0; i < count; i++ {
+ <-countChan
+ }
+
+ // all retrieved scopes should be even
+ retrievedScopes = testCache.readAllScope()
+ for _, s := range retrievedScopes {
+ scopeNum, _ := strconv.Atoi(s)
+ Expect(scopeNum % 2).To(BeZero())
+ }
+
+ // async remove all datascopes
+ for i := 0; i < num; i++ {
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ go func(scope *dataDataScope) {
+ testCache.removeCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+
+ for i := 0; i < num; i++ {
+ <-countChan
+ }
+ retrievedScopes = testCache.readAllScope()
+ Expect(len(retrievedScopes)).To(Equal(0))
+
+ testCache.closeCache()
+ })
+
+})
diff --git a/init.go b/init.go
index 5e2b840..d40a9e7 100644
--- a/init.go
+++ b/init.go
@@ -78,6 +78,10 @@
data = services.Data()
events = services.Events()
+ scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
+
+ go scopeCache.datascopeCacheManager()
+
/* This callback function will get called, once all the plugins are
* initialized (not just this plugin). This is needed because,
* downloadSnapshots/changes etc have to begin to be processed only
diff --git a/listener.go b/listener.go
index 59b358a..1fbd82e 100644
--- a/listener.go
+++ b/listener.go
@@ -42,6 +42,9 @@
log.Panicf("Unable to initialize database: %v", err)
}
+ // clear cache
+ scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
+
tx, err := db.Begin()
if err != nil {
log.Panicf("Error starting transaction: %v", err)
@@ -70,6 +73,10 @@
if err != nil {
log.Panicf("Snapshot update failed: %v", err)
}
+ // cache scopes for this cluster
+ if ds.ClusterID == apidInfo.ClusterID {
+ scopeCache.updateCache(&ds)
+ }
}
}
}
@@ -114,9 +121,19 @@
case common.Insert:
ds := makeDataScopeFromRow(change.NewRow)
err = insertDataScope(ds, tx)
+
+ // cache scopes for this cluster
+ if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
+ scopeCache.updateCache(&ds)
+ }
case common.Delete:
ds := makeDataScopeFromRow(change.OldRow)
- deleteDataScope(ds, tx)
+ err = deleteDataScope(ds, tx)
+
+ // cache scopes for this cluster
+ if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
+ scopeCache.removeCache(&ds)
+ }
default:
// common.Update is not allowed
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)