merging in master
diff --git a/apigee_sync.go b/apigee_sync.go index fcf47d1..612e891 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -93,7 +93,7 @@ } func addHeaders(req *http.Request) { - req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken()) + req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken()) req.Header.Set("apid_instance_id", apidInfo.InstanceID) req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) @@ -119,4 +119,4 @@ func (a changeServerError) Error() string { return a.Code -} +} \ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 579a5c3..0bf56bd 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -253,5 +253,35 @@ }) testMock.forceNewSnapshot() }) + + + It("Verify the Sequence Number Logic works as expected", func() { + Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) + Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) + Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) + Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) + Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) + Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) + }) + + /* + * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap + */ + It("Should be able to handle duplicate snapshot during bootstrap", func() { + initializeContext() + + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", + } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + + scopes := []string{apidInfo.ClusterID} + snapshot := &common.Snapshot{} + downloadSnapshot(scopes, snapshot, nil) + storeBootSnapshot(snapshot) + storeDataSnapshot(snapshot) + restoreContext() + }) }) })
diff --git a/changes.go b/changes.go index f182311..d8176c2 100644 --- a/changes.go +++ b/changes.go
@@ -138,7 +138,7 @@ log.Debug("polling...") /* Find the scopes associated with the config id */ - scopes := findScopesForId(apidInfo.ClusterID) + scopes := scopeCache.readAllScope() v := url.Values{} /* Sequence added to the query if available */ @@ -148,9 +148,9 @@ v.Add("block", block) /* - * Include all the scopes associated with the config Id - * The Config Id is included as well, as it acts as the - * Bootstrap scope + * Include all the scopes associated with the config Id + * The Config Id is included as well, as it acts as the + * Bootstrap scope */ for _, scope := range scopes { v.Add("scope", scope) @@ -161,6 +161,7 @@ uri := changesUri.String() log.Debugf("Fetching changes: %s", uri) + /* If error, break the loop, and retry after interval */ client := &http.Client{Timeout: httpTimeout} // must be greater than block value req, err := http.NewRequest("GET", uri, nil) addHeaders(req) @@ -221,6 +222,17 @@ return err } + /* + * If the lastSequence is already newer or the same than what we got via + * resp.LastSequence, Ignore it. + */ + if lastSequence != "" && + getChangeStatus(lastSequence, resp.LastSequence) != 1 { + return changeServerError{ + Code: "Ignore change, already have newer changes", + } + } + if changesRequireDDLSync(resp) { return changeServerError{ Code: "DDL changes detected; must get new snapshot", @@ -238,13 +250,8 @@ log.Debugf("No Changes detected for Scopes: %s", scopes) } - if lastSequence != resp.LastSequence { - lastSequence = resp.LastSequence - err := updateLastSequence(resp.LastSequence) - if err != nil { - log.Panic("Unable to update Sequence in DB") - } - } + updateSequence(resp.LastSequence) + return nil } @@ -285,3 +292,28 @@ return false } + +/* + * seqCurr.Compare() will return 1, if its newer than seqPrev, + * else will return 0, if same, or -1 if older. + */ +func getChangeStatus(lastSeq string, currSeq string) int { + seqPrev, err := common.ParseSequence(lastSeq) + if err != nil { + log.Panic("Unable to parse previous sequence string") + } + seqCurr, err := common.ParseSequence(currSeq) + if err != nil { + log.Panic("Unable to parse current sequence string") + } + return seqCurr.Compare(seqPrev) +} + +func updateSequence(seq string) { + lastSequence = seq + err := updateLastSequence(seq) + if err != nil { + log.Panic("Unable to update Sequence in DB") + } + +}
diff --git a/datascope_cache.go b/datascope_cache.go new file mode 100644 index 0000000..3d19710 --- /dev/null +++ b/datascope_cache.go
@@ -0,0 +1,93 @@ +package apidApigeeSync + +const ( + readCache int = iota + updateCache + removeCache + clearAndInit +) + +/* + * structs for DatascopeCache + */ + +type cacheOperationRequest struct { + Operation int + Scope *dataDataScope + version string +} + +// maintain an in-mem cache of datascope +type DatascopeCache struct { + requestChan chan *cacheOperationRequest + readDoneChan chan []string + scopeMap map[string]*dataDataScope + version string +} + +var scopeCache *DatascopeCache + +func (cache *DatascopeCache) datascopeCacheManager() { + for request := range cache.requestChan { + switch request.Operation { + case readCache: + log.Debug("datascopeCacheManager: readCache") + scopes := make([]string, 0, len(cache.scopeMap)) + for _, ds := range cache.scopeMap { + scopes = append(scopes, ds.Scope) + } + cache.readDoneChan <- scopes + case updateCache: + log.Debug("datascopeCacheManager: updateCache") + cache.scopeMap[request.Scope.ID] = request.Scope + case removeCache: + log.Debug("datascopeCacheManager: removeCache") + delete(cache.scopeMap, request.Scope.ID) + case clearAndInit: + log.Debug("datascopeCacheManager: clearAndInit") + if cache.version != request.version { + cache.scopeMap = make(map[string]*dataDataScope) + cache.version = request.version + } + } + } + + //chan closed + cache.scopeMap = nil + close(cache.readDoneChan) +} + +/* + * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID) + */ + +func (cache *DatascopeCache) readAllScope() []string { + cache.requestChan <- &cacheOperationRequest{readCache, nil, ""} + scopes := <-cache.readDoneChan + // eliminate duplicates + tmpMap := make(map[string]bool) + for _, scope := range scopes { + tmpMap[scope] = true + } + scopes = make([]string, 0) + for scope := range tmpMap { + scopes = append(scopes, scope) + } + return scopes +} + +func (cache *DatascopeCache) removeCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""} +} + +func (cache *DatascopeCache) updateCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""} +} + +func (cache *DatascopeCache) clearAndInitCache(version string) { + cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version} +} + +func (cache *DatascopeCache) closeCache() { + close(cache.requestChan) +}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go new file mode 100644 index 0000000..bc67f53 --- /dev/null +++ b/datascope_cache_test.go
@@ -0,0 +1,97 @@ +package apidApigeeSync + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "math/rand" + "strconv" + "time" +) + +var _ = Describe("datascope cache", func() { + /* + * in-mem cache test + */ + It("Test In-mem cache", func() { + testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} + go testCache.datascopeCacheManager() + testCache.clearAndInitCache("test-version") + countChan := make(chan int) + base := 10 + rand.Seed(time.Now().Unix()) + num := base + rand.Intn(base) + scopeMap := make(map[string]bool) + // async update + for i := 0; i < num; i++ { + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + scopeMap[scope.Scope] = true + go func(scope *dataDataScope) { + testCache.updateCache(scope) + countChan <- 1 + }(scope) + } + + // wait until update done + for i := 0; i < num; i++ { + <-countChan + } + + // verify update + retrievedScopes := testCache.readAllScope() + Expect(len(scopeMap)).To(Equal(len(retrievedScopes))) + for _, s := range retrievedScopes { + // verify each retrieved scope is valid + Expect(scopeMap[s]).To(BeTrue()) + // no duplicate scopes + scopeMap[s] = true + } + + // remove all the datascopes with odd scope + count := 0 + for i := 0; i < num; i++ { + if (i%base)%2 == 1 { + count += 1 + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + go func(scope *dataDataScope) { + testCache.removeCache(scope) + countChan <- 1 + }(scope) + } + } + + for i := 0; i < count; i++ { + <-countChan + } + + // all retrieved scopes should be even + retrievedScopes = testCache.readAllScope() + for _, s := range retrievedScopes { + scopeNum, _ := strconv.Atoi(s) + Expect(scopeNum % 2).To(BeZero()) + } + + // async remove all datascopes + for i := 0; i < num; i++ { + id := strconv.Itoa(i) + scopeStr := strconv.Itoa(i % base) + scope := &dataDataScope{ID: id, Scope: scopeStr} + go func(scope *dataDataScope) { + testCache.removeCache(scope) + countChan <- 1 + }(scope) + } + + for i := 0; i < num; i++ { + <-countChan + } + retrievedScopes = testCache.readAllScope() + Expect(len(retrievedScopes)).To(Equal(0)) + + testCache.closeCache() + }) + +})
diff --git a/init.go b/init.go index e1b31c5..98b4770 100644 --- a/init.go +++ b/init.go
@@ -100,6 +100,8 @@ } config.Set(configApidInstanceID, apidInfo.InstanceID) + scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} + go scopeCache.datascopeCacheManager() return nil }
diff --git a/listener.go b/listener.go index ba6c02c..0090326 100644 --- a/listener.go +++ b/listener.go
@@ -65,6 +65,9 @@ log.Panicf("Unable to initialize database: %v", err) } + // clear cache + scopeCache.clearAndInitCache(snapshot.SnapshotInfo) + tx, err := db.Begin() if err != nil { log.Panicf("Error starting transaction: %v", err) @@ -93,6 +96,10 @@ if err != nil { log.Panicf("Snapshot update failed: %v", err) } + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } } } } @@ -128,9 +135,19 @@ case common.Insert: ds := makeDataScopeFromRow(change.NewRow) err = insertDataScope(ds, tx) + + // cache scopes for this cluster + if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) { + scopeCache.updateCache(&ds) + } case common.Delete: ds := makeDataScopeFromRow(change.OldRow) - deleteDataScope(ds, tx) + err = deleteDataScope(ds, tx) + + // cache scopes for this cluster + if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) { + scopeCache.removeCache(&ds) + } default: // common.Update is not allowed log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/snapshot.go b/snapshot.go index 1731cfb..aa5eada 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -22,19 +22,39 @@ scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} downloadSnapshot(scopes, snapshot, quitPolling) + storeBootSnapshot(snapshot) +} + +func storeBootSnapshot(snapshot *common.Snapshot) { + // note that for boot snapshot case, we don't touch databases. We only update in-mem cache + // This aims to deal with duplicate snapshot version#, see XAPID-869 for details + scopeCache.clearAndInitCache(snapshot.SnapshotInfo) + for _, table := range snapshot.Tables { + if table.Name == LISTENER_TABLE_DATA_SCOPE { + for _, row := range table.Rows { + ds := makeDataScopeFromRow(row) + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } + } + } + } // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot - processSnapshot(snapshot) } // use the scope IDs from the boot snapshot to get all the data associated with the scopes func downloadDataSnapshot(quitPolling chan bool) { log.Debug("download Snapshot for data scopes") - var scopes = findScopesForId(apidInfo.ClusterID) + scopes := scopeCache.readAllScope() scopes = append(scopes, apidInfo.ClusterID) snapshot := &common.Snapshot{} downloadSnapshot(scopes, snapshot, quitPolling) + storeDataSnapshot(snapshot) +} +func storeDataSnapshot(snapshot *common.Snapshot) { knownTables = extractTablesFromSnapshot(snapshot) db, err := dataService.DBVersion(snapshot.SnapshotInfo) @@ -50,6 +70,7 @@ log.Panic("Timeout. Plugins failed to respond to snapshot.") case <-events.Emit(ApigeeSyncEventSelector, snapshot): } + } func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) { @@ -113,7 +134,7 @@ } // will keep retrying with backoff until success -func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) error { +func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) { log.Debug("downloadSnapshot") @@ -142,8 +163,6 @@ attemptDownload := getAttemptDownloadClosure(client, snapshot, uri) pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError) - return nil - } func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {