fixed panic, added in-mem cache for scopes
diff --git a/apigee_sync.go b/apigee_sync.go index b63255c..2ed1546 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -21,11 +21,37 @@ maxBackoffTimeout = time.Minute ) +/* + * structs for DatascopeCache + */ + +const ( + readCache int = iota + updateCache + removeCache + clearAndInit +) + +type cacheOperationRequest struct { + Operation int + Scope *dataDataScope + version string +} + +// maintain an in-mem cache of datascope +type DatascopeCache struct { + requestChan chan *cacheOperationRequest + readDoneChan chan []string + scopeMap map[string]*dataDataScope + version string +} + var ( block string = "45" lastSequence string polling uint32 knownTables = make(map[string]bool) + scopeCache *DatascopeCache ) /* @@ -265,15 +291,30 @@ scopes := []string{apidInfo.ClusterID} snapshot := downloadSnapshot(scopes) + // note that for boot snapshot case, we don't touch databases. We only update in-mem cache + // This aims to deal with duplicate snapshot version#, see XAPID-869 for details + scopeCache.clearAndInitCache(snapshot.SnapshotInfo) + for _, table := range snapshot.Tables { + if table.Name == LISTENER_TABLE_DATA_SCOPE { + for _, row := range table.Rows { + ds := makeDataScopeFromRow(row) + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } + } + } + } // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot - processSnapshot(&snapshot) } // use the scope IDs from the boot snapshot to get all the data associated with the scopes func downloadDataSnapshot() { log.Debug("download Snapshot for data scopes") - var scopes = findScopesForId(apidInfo.ClusterID) + //var scopes = findScopesForId(apidInfo.ClusterID) + scopes := scopeCache.readAllScope() + scopes = append(scopes, apidInfo.ClusterID) resp := downloadSnapshot(scopes) @@ -391,6 +432,7 @@ } knownTables = extractTablesFromDB(db) + scopeCache.clearAndInitCache(snapshot) // allow plugins (including this one) to start immediately on existing database // Note: this MUST have no tables as that is used as an indicator @@ -491,6 +533,58 @@ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } +func (cache *DatascopeCache) datascopeCacheManager() { + for request := range cache.requestChan { + switch request.Operation { + case readCache: + log.Debug("datascopeCacheManager: readCache") + scopes := make([]string, 0, len(cache.scopeMap)) + for _, ds := range cache.scopeMap { + scopes = append(scopes, ds.Scope) + } + cache.readDoneChan <- scopes + case updateCache: + log.Debug("datascopeCacheManager: updateCache") + cache.scopeMap[request.Scope.ID] = request.Scope + case removeCache: + log.Debug("datascopeCacheManager: removeCache") + delete(cache.scopeMap, request.Scope.ID) + case clearAndInit: + log.Debug("datascopeCacheManager: clearAndInit") + if cache.version != request.version { + cache.scopeMap = make(map[string]*dataDataScope) + cache.version = request.version + } + } + } + + //chan closed + cache.scopeMap = nil + close(cache.requestChan) +} + +func (cache *DatascopeCache) readAllScope() []string { + cache.requestChan <- &cacheOperationRequest{readCache, nil, ""} + scopes := <-cache.readDoneChan + return scopes +} + +func (cache *DatascopeCache) removeCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""} +} + +func (cache *DatascopeCache) updateCache(scope *dataDataScope) { + cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""} +} + +func (cache *DatascopeCache) clearAndInitCache(version string) { + cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version} +} + +func (cache *DatascopeCache) closeCache() { + close(cache.requestChan) +} + type changeServerError struct { Code string `json:"code"` }
diff --git a/init.go b/init.go index 5e2b840..d40a9e7 100644 --- a/init.go +++ b/init.go
@@ -78,6 +78,10 @@ data = services.Data() events = services.Events() + scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} + + go scopeCache.datascopeCacheManager() + /* This callback function will get called, once all the plugins are * initialized (not just this plugin). This is needed because, * downloadSnapshots/changes etc have to begin to be processed only
diff --git a/listener.go b/listener.go index 59b358a..3828aba 100644 --- a/listener.go +++ b/listener.go
@@ -42,6 +42,9 @@ log.Panicf("Unable to initialize database: %v", err) } + // clear cache + scopeCache.clearAndInitCache(snapshot.SnapshotInfo) + tx, err := db.Begin() if err != nil { log.Panicf("Error starting transaction: %v", err) @@ -70,6 +73,10 @@ if err != nil { log.Panicf("Snapshot update failed: %v", err) } + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } } } } @@ -114,9 +121,19 @@ case common.Insert: ds := makeDataScopeFromRow(change.NewRow) err = insertDataScope(ds, tx) + + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } case common.Delete: ds := makeDataScopeFromRow(change.OldRow) deleteDataScope(ds, tx) + + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.removeCache(&ds) + } default: // common.Update is not allowed log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)