move in-mem cache to a new file, added test case for in-mem cache
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index afca88c..876ec9c 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -65,7 +65,7 @@
// Tests that entire bootstrap and set of sync operations work
var lastSnapshot *common.Snapshot
- expectedSnapshotTables := make(map[string] bool)
+ expectedSnapshotTables := make(map[string]bool)
expectedSnapshotTables["kms.company"] = true
expectedSnapshotTables["edgex.apid_cluster"] = true
expectedSnapshotTables["edgex.data_scope"] = true
@@ -79,8 +79,8 @@
Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
/* After this, we will mock changes for tables not present in the initial snapshot
- * until that is changed in the mock server, we have to spoof the known tables
- */
+ * until that is changed in the mock server, we have to spoof the known tables
+ */
//add apid_cluster and data_scope since those would present if this were a real scenario
knownTables["kms.app_credential"] = true
@@ -90,7 +90,6 @@
knownTables["kms.api_product"] = true
knownTables["kms.app"] = true
-
lastSnapshot = s
for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go
index d2b36b7..be3fc8c 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -21,37 +21,11 @@
maxBackoffTimeout = time.Minute
)
-/*
- * structs for DatascopeCache
- */
-
-const (
- readCache int = iota
- updateCache
- removeCache
- clearAndInit
-)
-
-type cacheOperationRequest struct {
- Operation int
- Scope *dataDataScope
- version string
-}
-
-// maintain an in-mem cache of datascope
-type DatascopeCache struct {
- requestChan chan *cacheOperationRequest
- readDoneChan chan []string
- scopeMap map[string]*dataDataScope
- version string
-}
-
var (
block string = "45"
lastSequence string
polling uint32
knownTables = make(map[string]bool)
- scopeCache *DatascopeCache
)
/*
@@ -112,7 +86,7 @@
log.Debug("polling...")
/* Find the scopes associated with the config id */
- scopes := findScopesForId(apidInfo.ClusterID)
+ scopes := scopeCache.readAllScope()
v := url.Values{}
/* Sequence added to the query if available */
@@ -295,7 +269,6 @@
}
func storeBootSnapshot(snapshot common.Snapshot) {
- log.Debug("storeBootSnapshot", snapshot.SnapshotInfo)
// note that for boot snapshot case, we don't touch databases. We only update in-mem cache
// This aims to deal with duplicate snapshot version#, see XAPID-869 for details
scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
@@ -317,7 +290,6 @@
func downloadDataSnapshot() {
log.Debug("download Snapshot for data scopes")
- //var scopes = findScopesForId(apidInfo.ClusterID)
scopes := scopeCache.readAllScope()
scopes = append(scopes, apidInfo.ClusterID)
@@ -326,7 +298,6 @@
}
func storeDataSnapshot(snapshot common.Snapshot) {
- log.Debug("storeDataSnapshot", snapshot.SnapshotInfo)
knownTables = extractTablesFromSnapshot(snapshot)
db, err := data.DBVersion(snapshot.SnapshotInfo)
@@ -349,8 +320,6 @@
}
-
-
func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
tables = make(map[string]bool)
@@ -545,58 +514,6 @@
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}
-func (cache *DatascopeCache) datascopeCacheManager() {
- for request := range cache.requestChan {
- switch request.Operation {
- case readCache:
- log.Debug("datascopeCacheManager: readCache")
- scopes := make([]string, 0, len(cache.scopeMap))
- for _, ds := range cache.scopeMap {
- scopes = append(scopes, ds.Scope)
- }
- cache.readDoneChan <- scopes
- case updateCache:
- log.Debug("datascopeCacheManager: updateCache")
- cache.scopeMap[request.Scope.ID] = request.Scope
- case removeCache:
- log.Debug("datascopeCacheManager: removeCache")
- delete(cache.scopeMap, request.Scope.ID)
- case clearAndInit:
- log.Debug("datascopeCacheManager: clearAndInit")
- if cache.version != request.version {
- cache.scopeMap = make(map[string]*dataDataScope)
- cache.version = request.version
- }
- }
- }
-
- //chan closed
- cache.scopeMap = nil
- close(cache.requestChan)
-}
-
-func (cache *DatascopeCache) readAllScope() []string {
- cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
- scopes := <-cache.readDoneChan
- return scopes
-}
-
-func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
- cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
-}
-
-func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
- cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
-}
-
-func (cache *DatascopeCache) clearAndInitCache(version string) {
- cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
-}
-
-func (cache *DatascopeCache) closeCache() {
- close(cache.requestChan)
-}
-
type changeServerError struct {
Code string `json:"code"`
}
diff --git a/apigee_sync_datascope_cache.go b/apigee_sync_datascope_cache.go
new file mode 100644
index 0000000..f6549ff
--- /dev/null
+++ b/apigee_sync_datascope_cache.go
@@ -0,0 +1,93 @@
+package apidApigeeSync
+
+const (
+ readCache int = iota
+ updateCache
+ removeCache
+ clearAndInit
+)
+
+/*
+ * structs for DatascopeCache
+ */
+
+type cacheOperationRequest struct {
+ Operation int
+ Scope *dataDataScope
+ version string
+}
+
+// maintain an in-mem cache of datascope
+type DatascopeCache struct {
+ requestChan chan *cacheOperationRequest
+ readDoneChan chan []string
+ scopeMap map[string]*dataDataScope
+ version string
+}
+
+var scopeCache *DatascopeCache
+
+func (cache *DatascopeCache) datascopeCacheManager() {
+ for request := range cache.requestChan {
+ switch request.Operation {
+ case readCache:
+ log.Debug("datascopeCacheManager: readCache")
+ scopes := make([]string, 0, len(cache.scopeMap))
+ for _, ds := range cache.scopeMap {
+ scopes = append(scopes, ds.Scope)
+ }
+ cache.readDoneChan <- scopes
+ case updateCache:
+ log.Debug("datascopeCacheManager: updateCache")
+ cache.scopeMap[request.Scope.ID] = request.Scope
+ case removeCache:
+ log.Debug("datascopeCacheManager: removeCache")
+ delete(cache.scopeMap, request.Scope.ID)
+ case clearAndInit:
+ log.Debug("datascopeCacheManager: clearAndInit")
+ if cache.version != request.version {
+ cache.scopeMap = make(map[string]*dataDataScope)
+ cache.version = request.version
+ }
+ }
+ }
+
+ //chan closed
+ cache.scopeMap = nil
+ close(cache.requestChan)
+}
+
+/*
+ * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID)
+ */
+
+func (cache *DatascopeCache) readAllScope() []string {
+ cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
+ scopes := <-cache.readDoneChan
+ // eliminate duplicates
+ tmpMap := make(map[string]bool)
+ for _, scope := range scopes {
+ tmpMap[scope] = true
+ }
+ scopes = make([]string, 0)
+ for scope := range tmpMap {
+ scopes = append(scopes, scope)
+ }
+ return scopes
+}
+
+func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
+ cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
+}
+
+func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
+ cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
+}
+
+func (cache *DatascopeCache) clearAndInitCache(version string) {
+ cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
+}
+
+func (cache *DatascopeCache) closeCache() {
+ close(cache.requestChan)
+}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 07ffe18..004306d 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -5,6 +5,9 @@
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "math/rand"
+ "strconv"
+ "time"
)
var _ = Describe("listener", func() {
@@ -80,8 +83,9 @@
Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
})
+
/*
- * XAPID-869
+ * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
*/
It("Should be able to handle duplicate snapshot during bootstrap", func() {
scopes := []string{apidInfo.ClusterID}
@@ -90,4 +94,87 @@
storeDataSnapshot(snapshot)
})
+ /*
+ * in-mem cache test
+ */
+ It("Test In-mem cache", func() {
+ testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
+ go testCache.datascopeCacheManager()
+ testCache.clearAndInitCache("test-version")
+ countChan := make(chan int)
+ base := 10
+ rand.Seed(time.Now().Unix())
+ num := base + rand.Intn(base)
+ scopeMap := make(map[string]bool)
+ // async update
+ for i := 0; i < num; i++ {
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ scopeMap[scope.Scope] = true
+ go func(scope *dataDataScope) {
+ testCache.updateCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+
+ // wait until update done
+ for i := 0; i < num; i++ {
+ <-countChan
+ }
+
+ // verify update
+ retrievedScopes := testCache.readAllScope()
+ Expect(len(scopeMap)).To(Equal(len(retrievedScopes)))
+ for _, s := range retrievedScopes {
+ // verify each retrieved scope is valid
+ Expect(scopeMap[s]).To(BeTrue())
+ // no duplicate scopes
+ scopeMap[s] = true
+ }
+
+ // remove all the datascopes with odd scope
+ count := 0
+ for i := 0; i < num; i++ {
+ if (i%base)%2 == 1 {
+ count += 1
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ go func(scope *dataDataScope) {
+ testCache.removeCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+ }
+
+ for i := 0; i < count; i++ {
+ <-countChan
+ }
+
+ // all retrieved scopes should be even
+ retrievedScopes = testCache.readAllScope()
+ for _, s := range retrievedScopes {
+ scopeNum, _ := strconv.Atoi(s)
+ Expect(scopeNum % 2).To(BeZero())
+ }
+
+ // async remove all datascopes
+ for i := 0; i < num; i++ {
+ id := strconv.Itoa(i)
+ scopeStr := strconv.Itoa(i % base)
+ scope := &dataDataScope{ID: id, Scope: scopeStr}
+ go func(scope *dataDataScope) {
+ testCache.removeCache(scope)
+ countChan <- 1
+ }(scope)
+ }
+
+ for i := 0; i < num; i++ {
+ <-countChan
+ }
+ retrievedScopes = testCache.readAllScope()
+ Expect(len(retrievedScopes)).To(Equal(0))
+ })
+
})