Merge branch 'master' into refactor-haoming
diff --git a/apigee_sync.go b/apigee_sync.go
index d6b3a19..5c77056 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -33,8 +33,8 @@
return
}
- downloadBootSnapshot(nil)
- downloadDataSnapshot(quitPollingSnapshotServer)
+ snapManager.downloadBootSnapshot()
+ snapManager.downloadDataSnapshot()
changeManager.pollChangeWithBackoff()
diff --git a/changes.go b/changes.go
index aa8d822..0ac39b2 100644
--- a/changes.go
+++ b/changes.go
@@ -51,9 +51,11 @@
}
// not launched
if atomic.LoadInt32(c.isLaunched) == int32(0) {
- log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!")
+ log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
go func() {
+ c.quitChan <- true
tokenManager.close()
+ <-snapManager.close()
finishChan <- false
log.Debug("change manager closed")
}()
@@ -64,6 +66,7 @@
go func() {
c.quitChan <- true
tokenManager.close()
+ <-snapManager.close()
finishChan <- true
log.Debug("change manager closed")
}()
@@ -75,11 +78,6 @@
*/
func (c *pollChangeManager) pollChangeWithBackoff() {
- // closed
- if atomic.LoadInt32(c.isClosed) == int32(1) {
- log.Error("pollChangeManager: pollChangeWithBackoff() called after closed")
- return
- }
// has been launched before
if atomic.SwapInt32(c.isLaunched, 1) == int32(1) {
log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before")
@@ -267,7 +265,7 @@
}
if _, ok := err.(changeServerError); ok {
log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
- downloadDataSnapshot(c.quitChan)
+ snapManager.downloadDataSnapshot()
} else {
log.Debugf("Error connecting to changeserver: %v", err)
}
diff --git a/init.go b/init.go
index e1b31c5..50ed49d 100644
--- a/init.go
+++ b/init.go
@@ -30,15 +30,15 @@
var (
/* All set during plugin initialization */
- log apid.LogService
- config apid.ConfigService
- dataService apid.DataService
- events apid.EventsService
- apidInfo apidInstanceInfo
- newInstanceID bool
- tokenManager *tokenMan
- changeManager *pollChangeManager
- quitPollingSnapshotServer chan bool
+ log apid.LogService
+ config apid.ConfigService
+ dataService apid.DataService
+ events apid.EventsService
+ apidInfo apidInstanceInfo
+ newInstanceID bool
+ tokenManager *tokenMan
+ changeManager *pollChangeManager
+ snapManager *snapShotManager
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -76,7 +76,7 @@
events = services.Events()
//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
//also useful for testing
- quitPollingSnapshotServer = make(chan bool)
+ snapManager = createSnapShotManager()
changeManager = createChangeManager()
// set up default database
diff --git a/snapshot.go b/snapshot.go
index ae667bf..0158871 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -12,17 +12,93 @@
"io/ioutil"
"net/url"
"path"
+ "sync/atomic"
"time"
)
+type snapShotManager struct {
+ // to send quit signal to the downloading thread
+ quitChan chan bool
+ // to mark the graceful close of snapshotManager
+ finishChan chan bool
+ // 0 for not closed, 1 for closed
+ isClosed *int32
+ // make sure close() returns immediately if there's no downloading/processing snapshot
+ isDownloading *int32
+}
+
+func createSnapShotManager() *snapShotManager {
+ isClosedInt := int32(0)
+ isDownloadingInt := int32(0)
+ return &snapShotManager{
+ quitChan: make(chan bool, 1),
+ finishChan: make(chan bool, 1),
+ isClosed: &isClosedInt,
+ isDownloading: &isDownloadingInt,
+ }
+}
+
+/*
+ * thread-safe close of snapShotManager
+ * It marks status as closed immediately, and quits backoff downloading
+ * use <- close() for blocking close
+ * should only be called by pollChangeManager, because pollChangeManager is dependent on it
+ */
+func (s *snapShotManager) close() <-chan bool {
+ //has been closed before
+ if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
+ log.Error("snapShotManager: close() called on a closed snapShotManager!")
+ go func() {
+ s.finishChan <- false
+ log.Debug("change manager closed")
+ }()
+ return s.finishChan
+ }
+ s.quitChan <- true
+ // wait until no downloading
+ for atomic.LoadInt32(s.isDownloading) == int32(1) {
+ time.Sleep(time.Millisecond)
+ }
+ s.finishChan <- true
+ return s.finishChan
+}
+
// retrieve boot information: apid_config and apid_config_scope
-func downloadBootSnapshot(quitPolling chan bool) {
+func (s *snapShotManager) downloadBootSnapshot() {
+ if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
+ log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
+ return
+ }
+ defer atomic.StoreInt32(s.isDownloading, int32(0))
+
+ // has been closed
+ if atomic.LoadInt32(s.isClosed) == int32(1) {
+ log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager")
+ return
+ }
+
log.Debug("download Snapshot for boot data")
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
- downloadSnapshot(scopes, snapshot, quitPolling)
- storeBootSnapshot(snapshot)
+
+ err := s.downloadSnapshot(scopes, snapshot)
+ if err != nil {
+ // this may happen during shutdown
+ if _, ok := err.(quitSignalError); ok {
+ log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error())
+ }
+ return
+ }
+
+ // has been closed
+ if atomic.LoadInt32(s.isClosed) == int32(1) {
+ log.Error("snapShotManager: processSnapshot called on closed snapShotManager")
+ return
+ }
+
+ // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
+ storeBootSnapshot(snapshot)
}
func storeBootSnapshot(snapshot *common.Snapshot) {
@@ -30,13 +106,32 @@
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func downloadDataSnapshot(quitPolling chan bool) {
+func (s *snapShotManager) downloadDataSnapshot() {
+ if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
+ log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
+ return
+ }
+ defer atomic.StoreInt32(s.isDownloading, int32(0))
+
+ // has been closed
+ if atomic.LoadInt32(s.isClosed) == int32(1) {
+ log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager")
+ return
+ }
+
log.Debug("download Snapshot for data scopes")
scopes := findScopesForId(apidInfo.ClusterID)
scopes = append(scopes, apidInfo.ClusterID)
snapshot := &common.Snapshot{}
- downloadSnapshot(scopes, snapshot, quitPolling)
+ err := s.downloadSnapshot(scopes, snapshot)
+ if err != nil {
+ // this may happen during shutdown
+ if _, ok := err.(quitSignalError); ok {
+ log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error())
+ }
+ return
+ }
storeDataSnapshot(snapshot)
}
@@ -47,6 +142,12 @@
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
+
+ // if closed
+ if atomic.LoadInt32(s.isClosed) == int32(1) {
+ log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager")
+ return
+ }
persistKnownTablesToDB(knownTables, db)
log.Info("Emitting Snapshot to plugins")
@@ -55,6 +156,8 @@
case <-time.After(pluginTimeout):
log.Panic("Timeout. Plugins failed to respond to snapshot.")
case <-events.Emit(ApigeeSyncEventSelector, snapshot):
+ // the new snapshot has been processed
+ // if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
}
}
@@ -119,8 +222,15 @@
}
}
+// a blocking method
// will keep retrying with backoff until success
-func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) {
+
+func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+ // if closed
+ if atomic.LoadInt32(s.isClosed) == int32(1) {
+ log.Warn("Trying to download snapshot with a closed snapShotManager")
+ return quitSignalError{}
+ }
log.Debug("downloadSnapshot")
@@ -147,8 +257,8 @@
//pollWithBackoff only accepts function that accept a single quit channel
//to accomadate functions which need more parameters, wrap them in closures
attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
-
- pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError)
+ pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
+ return nil
}
func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {