add graceful close() to snapshot downloading manager
diff --git a/apigee_sync.go b/apigee_sync.go index fcf47d1..8e7cafd 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -33,8 +33,8 @@ return } - downloadBootSnapshot(nil) - downloadDataSnapshot(quitPollingSnapshotServer) + snapManager.downloadBootSnapshot() + snapManager.downloadDataSnapshot() changeManager.pollChangeWithBackoff()
diff --git a/changes.go b/changes.go index f182311..a88fbdd 100644 --- a/changes.go +++ b/changes.go
@@ -54,6 +54,7 @@ log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!") go func() { tokenManager.close() + <-snapManager.close() finishChan <- false log.Debug("change manager closed") }() @@ -64,6 +65,7 @@ go func() { c.quitChan <- true tokenManager.close() + <-snapManager.close() finishChan <- true log.Debug("change manager closed") }() @@ -260,7 +262,7 @@ } if _, ok := err.(changeServerError); ok { log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") - downloadDataSnapshot(c.quitChan) + snapManager.downloadDataSnapshot() } else { log.Debugf("Error connecting to changeserver: %v", err) }
diff --git a/init.go b/init.go index e1b31c5..50ed49d 100644 --- a/init.go +++ b/init.go
@@ -30,15 +30,15 @@ var ( /* All set during plugin initialization */ - log apid.LogService - config apid.ConfigService - dataService apid.DataService - events apid.EventsService - apidInfo apidInstanceInfo - newInstanceID bool - tokenManager *tokenMan - changeManager *pollChangeManager - quitPollingSnapshotServer chan bool + log apid.LogService + config apid.ConfigService + dataService apid.DataService + events apid.EventsService + apidInfo apidInstanceInfo + newInstanceID bool + tokenManager *tokenMan + changeManager *pollChangeManager + snapManager *snapShotManager /* Set during post plugin initialization * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called @@ -76,7 +76,7 @@ events = services.Events() //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines //also useful for testing - quitPollingSnapshotServer = make(chan bool) + snapManager = createSnapShotManager() changeManager = createChangeManager() // set up default database
diff --git a/snapshot.go b/snapshot.go index 1731cfb..055d896 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -12,28 +12,121 @@ "io/ioutil" "net/url" "path" + "sync/atomic" "time" ) +type snapShotManager struct { + // to send quit signal to the downloading thread + quitChan chan bool + // to mark the graceful close of snapshotManager + finishChan chan bool + // 0 for not closed, 1 for closed + isClosed *int32 + // make sure close() returns immediately if there's no downloading/processing snapshot + isDownloading *int32 +} + +func createSnapShotManager() *snapShotManager { + isClosedInt := int32(0) + isDownloadingInt := int32(0) + return &snapShotManager{ + quitChan: make(chan bool, 1), + finishChan: make(chan bool, 1), + isClosed: &isClosedInt, + isDownloading: &isDownloadingInt, + } +} + +/* + * thread-safe close of snapShotManager + * It marks status as closed immediately, and quits backoff downloading + * use <- close() for blocking close + * should only be called by pollChangeManager, because pollChangeManager is dependent on it + */ +func (s *snapShotManager) close() <-chan bool { + //has been closed before + if atomic.SwapInt32(s.isClosed, 1) == int32(1) { + log.Error("snapShotManager: close() called on a closed snapShotManager!") + go func() { + s.finishChan <- false + log.Debug("change manager closed") + }() + return s.finishChan + } + s.quitChan <- true + // wait until no downloading + for atomic.LoadInt32(s.isDownloading) == int32(1) { + time.Sleep(time.Millisecond) + } + s.finishChan <- true + return s.finishChan +} + // retrieve boot information: apid_config and apid_config_scope -func downloadBootSnapshot(quitPolling chan bool) { +func (s *snapShotManager) downloadBootSnapshot() { + if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { + log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!") + return + } + defer atomic.StoreInt32(s.isDownloading, int32(0)) + + // has been closed + if atomic.LoadInt32(s.isClosed) == int32(1) { + log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager") + return + } + log.Debug("download Snapshot for boot data") scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} - downloadSnapshot(scopes, snapshot, quitPolling) + err := s.downloadSnapshot(scopes, snapshot) + if err != nil { + // this may happen during shutdown + if _, ok := err.(quitSignalError); ok { + log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error()) + } + return + } + + // has been closed + if atomic.LoadInt32(s.isClosed) == int32(1) { + log.Error("snapShotManager: processSnapshot called on closed snapShotManager") + return + } + // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot processSnapshot(snapshot) } // use the scope IDs from the boot snapshot to get all the data associated with the scopes -func downloadDataSnapshot(quitPolling chan bool) { +func (s *snapShotManager) downloadDataSnapshot() { + if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { + log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!") + return + } + defer atomic.StoreInt32(s.isDownloading, int32(0)) + + // has been closed + if atomic.LoadInt32(s.isClosed) == int32(1) { + log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager") + return + } + log.Debug("download Snapshot for data scopes") var scopes = findScopesForId(apidInfo.ClusterID) scopes = append(scopes, apidInfo.ClusterID) snapshot := &common.Snapshot{} - downloadSnapshot(scopes, snapshot, quitPolling) + err := s.downloadSnapshot(scopes, snapshot) + if err != nil { + // this may happen during shutdown + if _, ok := err.(quitSignalError); ok { + log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error()) + } + return + } knownTables = extractTablesFromSnapshot(snapshot) @@ -41,6 +134,12 @@ if err != nil { log.Panicf("Database inaccessible: %v", err) } + + // if closed + if atomic.LoadInt32(s.isClosed) == int32(1) { + log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager") + return + } persistKnownTablesToDB(knownTables, db) log.Info("Emitting Snapshot to plugins") @@ -49,6 +148,9 @@ case <-time.After(pluginTimeout): log.Panic("Timeout. Plugins failed to respond to snapshot.") case <-events.Emit(ApigeeSyncEventSelector, snapshot): + // the new snapshot has been processed + // if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency + // In this case, will close now } } @@ -112,8 +214,14 @@ } } +// a blocking method // will keep retrying with backoff until success -func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) error { +func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error { + // if closed + if atomic.LoadInt32(s.isClosed) == int32(1) { + log.Warn("Trying to download snapshot with a closed snapShotManager") + return quitSignalError{} + } log.Debug("downloadSnapshot") @@ -141,7 +249,7 @@ //to accomadate functions which need more parameters, wrap them in closures attemptDownload := getAttemptDownloadClosure(client, snapshot, uri) - pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError) + pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError) return nil }