move in-mem cache to a new file, added test case for in-mem cache
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index afca88c..876ec9c 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -65,7 +65,7 @@ // Tests that entire bootstrap and set of sync operations work var lastSnapshot *common.Snapshot - expectedSnapshotTables := make(map[string] bool) + expectedSnapshotTables := make(map[string]bool) expectedSnapshotTables["kms.company"] = true expectedSnapshotTables["edgex.apid_cluster"] = true expectedSnapshotTables["edgex.data_scope"] = true @@ -79,8 +79,8 @@ Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue()) /* After this, we will mock changes for tables not present in the initial snapshot - * until that is changed in the mock server, we have to spoof the known tables - */ + * until that is changed in the mock server, we have to spoof the known tables + */ //add apid_cluster and data_scope since those would present if this were a real scenario knownTables["kms.app_credential"] = true @@ -90,7 +90,6 @@ knownTables["kms.api_product"] = true knownTables["kms.app"] = true - lastSnapshot = s for _, t := range s.Tables {
diff --git a/apigee_sync.go b/apigee_sync.go index d2b36b7..be3fc8c 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -21,37 +21,11 @@ maxBackoffTimeout = time.Minute ) -/* - * structs for DatascopeCache - */ - -const ( - readCache int = iota - updateCache - removeCache - clearAndInit -) - -type cacheOperationRequest struct { - Operation int - Scope *dataDataScope - version string -} - -// maintain an in-mem cache of datascope -type DatascopeCache struct { - requestChan chan *cacheOperationRequest - readDoneChan chan []string - scopeMap map[string]*dataDataScope - version string -} - var ( block string = "45" lastSequence string polling uint32 knownTables = make(map[string]bool) - scopeCache *DatascopeCache ) /* @@ -112,7 +86,7 @@ log.Debug("polling...") /* Find the scopes associated with the config id */ - scopes := findScopesForId(apidInfo.ClusterID) + scopes := scopeCache.readAllScope() v := url.Values{} /* Sequence added to the query if available */ @@ -295,7 +269,6 @@ } func storeBootSnapshot(snapshot common.Snapshot) { - log.Debug("storeBootSnapshot", snapshot.SnapshotInfo) // note that for boot snapshot case, we don't touch databases. We only update in-mem cache // This aims to deal with duplicate snapshot version#, see XAPID-869 for details scopeCache.clearAndInitCache(snapshot.SnapshotInfo) @@ -317,7 +290,6 @@ func downloadDataSnapshot() { log.Debug("download Snapshot for data scopes") - //var scopes = findScopesForId(apidInfo.ClusterID) scopes := scopeCache.readAllScope() scopes = append(scopes, apidInfo.ClusterID) @@ -326,7 +298,6 @@ } func storeDataSnapshot(snapshot common.Snapshot) { - log.Debug("storeDataSnapshot", snapshot.SnapshotInfo) knownTables = extractTablesFromSnapshot(snapshot) db, err := data.DBVersion(snapshot.SnapshotInfo) @@ -349,8 +320,6 @@ } - - func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) { tables = make(map[string]bool) @@ -545,58 +514,6 @@ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } -func (cache *DatascopeCache) datascopeCacheManager() { - for request := range cache.requestChan { - switch request.Operation { - case readCache: - log.Debug("datascopeCacheManager: readCache") - scopes := make([]string, 0, len(cache.scopeMap)) - for _, ds := range cache.scopeMap { - scopes = append(scopes, ds.Scope) - } - cache.readDoneChan <- scopes - case updateCache: - log.Debug("datascopeCacheManager: updateCache") - cache.scopeMap[request.Scope.ID] = request.Scope - case removeCache: - log.Debug("datascopeCacheManager: removeCache") - delete(cache.scopeMap, request.Scope.ID) - case clearAndInit: - log.Debug("datascopeCacheManager: clearAndInit") - if cache.version != request.version { - cache.scopeMap = make(map[string]*dataDataScope) - cache.version = request.version - } - } - } - - //chan closed - cache.scopeMap = nil - close(cache.requestChan) -} - -func (cache *DatascopeCache) readAllScope() []string { - cache.requestChan <- &cacheOperationRequest{readCache, nil, ""} - scopes := <-cache.readDoneChan - return scopes -} - -func (cache *DatascopeCache) removeCache(scope *dataDataScope) { - cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""} -} - -func (cache *DatascopeCache) updateCache(scope *dataDataScope) { - cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""} -} - -func (cache *DatascopeCache) clearAndInitCache(version string) { - cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version} -} - -func (cache *DatascopeCache) closeCache() { - close(cache.requestChan) -} - type changeServerError struct { Code string `json:"code"` }
diff --git a/apigee_sync_datascope_cache.go b/apigee_sync_datascope_cache.go new file mode 100644 index 0000000..f6549ff --- /dev/null +++ b/apigee_sync_datascope_cache.go
@@ -0,0 +1,93 @@ +package apidApigeeSync + +const ( + readCache int = iota + updateCache + removeCache + clearAndInit +) + +/* + * structs for DatascopeCache + */ + +type cacheOperationRequest struct { + Operation int + Scope *dataDataScope + version string +} + +// maintain an in-mem cache of datascope +type DatascopeCache struct { + requestChan chan *cacheOperationRequest + readDoneChan chan []string + scopeMap map[string]*dataDataScope + version string +} + +var scopeCache *DatascopeCache + +func (cache *DatascopeCache) datascopeCacheManager() { + for request := range cache.requestChan { + switch request.Operation { + case readCache: + log.Debug("datascopeCacheManager: readCache") + scopes := make([]string, 0, len(cache.scopeMap)) + for _, ds := range cache.scopeMap { + scopes = append(scopes, ds.Scope) + } + cache.readDoneChan <- scopes + case updateCache: + log.Debug("datascopeCacheManager: updateCache") + cache.scopeMap[request.Scope.ID] = request.Scope + case removeCache: + log.Debug("datascopeCacheManager: removeCache") + delete(cache.scopeMap, request.Scope.ID) + case clearAndInit: + log.Debug("datascopeCacheManager: clearAndInit") + if cache.version != request.version { + cache.scopeMap = make(map[string]*dataDataScope) + cache.version = request.version + } + } + } + + //chan closed + cache.scopeMap = nil + close(cache.requestChan) +} + +/* + * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID) + */ + +func (cache *DatascopeCache) readAllScope() []string { + cache.requestChan <- &cacheOperationRequest{readCache, nil, ""} + scopes := <-cache.readDoneChan + // eliminate duplicates + tmpMap := make(map[string]bool) + for _, scope := range scopes { + tmpMap[scope] = true + } + scopes = make([]string, 0) + for scope := range tmpMap { + scopes = append(scopes, scope) + } + return scopes +} + +func (cache *DatascopeCache) removeCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""} +} + +func (cache *DatascopeCache) updateCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""} +} + +func (cache *DatascopeCache) clearAndInitCache(version string) { + cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version} +} + +func (cache *DatascopeCache) closeCache() { + close(cache.requestChan) +}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 07ffe18..004306d 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -5,6 +5,9 @@ "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "math/rand" + "strconv" + "time" ) var _ = Describe("listener", func() { @@ -80,8 +83,9 @@ Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) }) + /* - * XAPID-869 + * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap */ It("Should be able to handle duplicate snapshot during bootstrap", func() { scopes := []string{apidInfo.ClusterID} @@ -90,4 +94,87 @@ storeDataSnapshot(snapshot) }) + /* + * in-mem cache test + */ + It("Test In-mem cache", func() { + testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} + go testCache.datascopeCacheManager() + testCache.clearAndInitCache("test-version") + countChan := make(chan int) + base := 10 + rand.Seed(time.Now().Unix()) + num := base + rand.Intn(base) + scopeMap := make(map[string]bool) + // async update + for i := 0; i < num; i++ { + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + scopeMap[scope.Scope] = true + go func(scope *dataDataScope) { + testCache.updateCache(scope) + countChan <- 1 + }(scope) + } + + // wait until update done + for i := 0; i < num; i++ { + <-countChan + } + + // verify update + retrievedScopes := testCache.readAllScope() + Expect(len(scopeMap)).To(Equal(len(retrievedScopes))) + for _, s := range retrievedScopes { + // verify each retrieved scope is valid + Expect(scopeMap[s]).To(BeTrue()) + // no duplicate scopes + scopeMap[s] = true + } + + // remove all the datascopes with odd scope + count := 0 + for i := 0; i < num; i++ { + if (i%base)%2 == 1 { + count += 1 + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + go func(scope *dataDataScope) { + testCache.removeCache(scope) + countChan <- 1 + }(scope) + } + } + + for i := 0; i < count; i++ { + <-countChan + } + + // all retrieved scopes should be even + retrievedScopes = testCache.readAllScope() + for _, s := range retrievedScopes { + scopeNum, _ := strconv.Atoi(s) + Expect(scopeNum % 2).To(BeZero()) + } + + // async remove all datascopes + for i := 0; i < num; i++ { + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + go func(scope *dataDataScope) { + testCache.removeCache(scope) + countChan <- 1 + }(scope) + } + + for i := 0; i < num; i++ { + <-countChan + } + retrievedScopes = testCache.readAllScope() + Expect(len(retrievedScopes)).To(Equal(0)) + }) + })