revert changes for in mem data scope cache
diff --git a/changes.go b/changes.go index d8176c2..aa8d822 100644 --- a/changes.go +++ b/changes.go
@@ -138,7 +138,7 @@ log.Debug("polling...") /* Find the scopes associated with the config id */ - scopes := scopeCache.readAllScope() + scopes := findScopesForId(apidInfo.ClusterID) v := url.Values{} /* Sequence added to the query if available */
diff --git a/datascope_cache.go b/datascope_cache.go deleted file mode 100644 index 3d19710..0000000 --- a/datascope_cache.go +++ /dev/null
@@ -1,93 +0,0 @@ -package apidApigeeSync - -const ( - readCache int = iota - updateCache - removeCache - clearAndInit -) - -/* - * structs for DatascopeCache - */ - -type cacheOperationRequest struct { - Operation int - Scope *dataDataScope - version string -} - -// maintain an in-mem cache of datascope -type DatascopeCache struct { - requestChan chan *cacheOperationRequest - readDoneChan chan []string - scopeMap map[string]*dataDataScope - version string -} - -var scopeCache *DatascopeCache - -func (cache *DatascopeCache) datascopeCacheManager() { - for request := range cache.requestChan { - switch request.Operation { - case readCache: - log.Debug("datascopeCacheManager: readCache") - scopes := make([]string, 0, len(cache.scopeMap)) - for _, ds := range cache.scopeMap { - scopes = append(scopes, ds.Scope) - } - cache.readDoneChan <- scopes - case updateCache: - log.Debug("datascopeCacheManager: updateCache") - cache.scopeMap[request.Scope.ID] = request.Scope - case removeCache: - log.Debug("datascopeCacheManager: removeCache") - delete(cache.scopeMap, request.Scope.ID) - case clearAndInit: - log.Debug("datascopeCacheManager: clearAndInit") - if cache.version != request.version { - cache.scopeMap = make(map[string]*dataDataScope) - cache.version = request.version - } - } - } - - //chan closed - cache.scopeMap = nil - close(cache.readDoneChan) -} - -/* - * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID) - */ - -func (cache *DatascopeCache) readAllScope() []string { - cache.requestChan <- &cacheOperationRequest{readCache, nil, ""} - scopes := <-cache.readDoneChan - // eliminate duplicates - tmpMap := make(map[string]bool) - for _, scope := range scopes { - tmpMap[scope] = true - } - scopes = make([]string, 0) - for scope := range tmpMap { - scopes = append(scopes, scope) - } - return scopes -} - -func (cache *DatascopeCache) removeCache(scope *dataDataScope) { - cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""} -} - -func (cache *DatascopeCache) updateCache(scope *dataDataScope) { - cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""} -} - -func (cache *DatascopeCache) clearAndInitCache(version string) { - cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version} -} - -func (cache *DatascopeCache) closeCache() { - close(cache.requestChan) -}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go deleted file mode 100644 index bc67f53..0000000 --- a/datascope_cache_test.go +++ /dev/null
@@ -1,97 +0,0 @@ -package apidApigeeSync - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "math/rand" - "strconv" - "time" -) - -var _ = Describe("datascope cache", func() { - /* - * in-mem cache test - */ - It("Test In-mem cache", func() { - testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} - go testCache.datascopeCacheManager() - testCache.clearAndInitCache("test-version") - countChan := make(chan int) - base := 10 - rand.Seed(time.Now().Unix()) - num := base + rand.Intn(base) - scopeMap := make(map[string]bool) - // async update - for i := 0; i < num; i++ { - id := strconv.Itoa(i) - scopeStr := strconv.Itoa(i % base) - scope := &dataDataScope{ID: id, Scope: scopeStr} - scopeMap[scope.Scope] = true - go func(scope *dataDataScope) { - testCache.updateCache(scope) - countChan <- 1 - }(scope) - } - - // wait until update done - for i := 0; i < num; i++ { - <-countChan - } - - // verify update - retrievedScopes := testCache.readAllScope() - Expect(len(scopeMap)).To(Equal(len(retrievedScopes))) - for _, s := range retrievedScopes { - // verify each retrieved scope is valid - Expect(scopeMap[s]).To(BeTrue()) - // no duplicate scopes - scopeMap[s] = true - } - - // remove all the datascopes with odd scope - count := 0 - for i := 0; i < num; i++ { - if (i%base)%2 == 1 { - count += 1 - id := strconv.Itoa(i) - scopeStr := strconv.Itoa(i % base) - scope := &dataDataScope{ID: id, Scope: scopeStr} - go func(scope *dataDataScope) { - testCache.removeCache(scope) - countChan <- 1 - }(scope) - } - } - - for i := 0; i < count; i++ { - <-countChan - } - - // all retrieved scopes should be even - retrievedScopes = testCache.readAllScope() - for _, s := range retrievedScopes { - scopeNum, _ := strconv.Atoi(s) - Expect(scopeNum % 2).To(BeZero()) - } - - // async remove all datascopes - for i := 0; i < num; i++ { - id := strconv.Itoa(i) - scopeStr := strconv.Itoa(i % base) - scope := &dataDataScope{ID: id, Scope: scopeStr} - go func(scope *dataDataScope) { - testCache.removeCache(scope) - countChan <- 1 - }(scope) - } - - for i := 0; i < num; i++ { - <-countChan - } - retrievedScopes = testCache.readAllScope() - Expect(len(retrievedScopes)).To(Equal(0)) - - testCache.closeCache() - }) - -})
diff --git a/init.go b/init.go index 98b4770..e1b31c5 100644 --- a/init.go +++ b/init.go
@@ -100,8 +100,6 @@ } config.Set(configApidInstanceID, apidInfo.InstanceID) - scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} - go scopeCache.datascopeCacheManager() return nil }
diff --git a/listener.go b/listener.go index 0090326..6c4b1ef 100644 --- a/listener.go +++ b/listener.go
@@ -65,9 +65,6 @@ log.Panicf("Unable to initialize database: %v", err) } - // clear cache - scopeCache.clearAndInitCache(snapshot.SnapshotInfo) - tx, err := db.Begin() if err != nil { log.Panicf("Error starting transaction: %v", err) @@ -96,10 +93,6 @@ if err != nil { log.Panicf("Snapshot update failed: %v", err) } - // cache scopes for this cluster - if ds.ClusterID == apidInfo.ClusterID { - scopeCache.updateCache(&ds) - } } } } @@ -135,19 +128,9 @@ case common.Insert: ds := makeDataScopeFromRow(change.NewRow) err = insertDataScope(ds, tx) - - // cache scopes for this cluster - if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) { - scopeCache.updateCache(&ds) - } case common.Delete: ds := makeDataScopeFromRow(change.OldRow) err = deleteDataScope(ds, tx) - - // cache scopes for this cluster - if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) { - scopeCache.removeCache(&ds) - } default: // common.Update is not allowed log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/snapshot.go b/snapshot.go index aa5eada..ae667bf 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -26,28 +26,14 @@ } func storeBootSnapshot(snapshot *common.Snapshot) { - // note that for boot snapshot case, we don't touch databases. We only update in-mem cache - // This aims to deal with duplicate snapshot version#, see XAPID-869 for details - scopeCache.clearAndInitCache(snapshot.SnapshotInfo) - for _, table := range snapshot.Tables { - if table.Name == LISTENER_TABLE_DATA_SCOPE { - for _, row := range table.Rows { - ds := makeDataScopeFromRow(row) - // cache scopes for this cluster - if ds.ClusterID == apidInfo.ClusterID { - scopeCache.updateCache(&ds) - } - } - } - } - // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot + processSnapshot(snapshot) } // use the scope IDs from the boot snapshot to get all the data associated with the scopes func downloadDataSnapshot(quitPolling chan bool) { log.Debug("download Snapshot for data scopes") - scopes := scopeCache.readAllScope() + scopes := findScopesForId(apidInfo.ClusterID) scopes = append(scopes, apidInfo.ClusterID) snapshot := &common.Snapshot{} downloadSnapshot(scopes, snapshot, quitPolling)