rewrite poll changes, rewrite test cases, fixed multiple race conditions in tests.
diff --git a/apigee_sync.go b/apigee_sync.go index 99275f3..fcf47d1 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -26,7 +26,7 @@ snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { - go pollWithBackoff(quitPollingChangeServer, pollChangeAgent, handleChangeServerError) + changeManager.pollChangeWithBackoff() }) log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) @@ -36,7 +36,7 @@ downloadBootSnapshot(nil) downloadDataSnapshot(quitPollingSnapshotServer) - go pollWithBackoff(quitPollingChangeServer, pollChangeAgent, handleChangeServerError) + changeManager.pollChangeWithBackoff() }
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 7aa1a81..579a5c3 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -6,7 +6,7 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "net/http/httptest" - "time" + //"time" ) var _ = Describe("Sync", func() { @@ -36,7 +36,6 @@ var restoreContext = func() { - tokenManager.close() testServer.Close() config.Set(configProxyServerBaseURI, dummyConfigValue) @@ -47,7 +46,7 @@ It("should succesfully bootstrap from clean slate", func(done Done) { log.Info("Starting sync tests...") - + var closeDone <-chan bool initializeContext() // do not wipe DB after. Lets use it wipeDBAferTest = false @@ -60,7 +59,6 @@ } apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - if s, ok := event.(*common.Snapshot); ok { Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) @@ -105,7 +103,7 @@ } } else if cl, ok := event.(*common.ChangeList); ok { - go func() { quitPollingChangeServer <- true }() + closeDone = changeManager.close() // ensure that snapshot switched DB versions Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) @@ -131,22 +129,25 @@ Expect(tables).To(ContainElement("kms.api_product")) Expect(tables).To(ContainElement("kms.app")) - events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { + go func() { + // when close done, all handlers for the first changeList have been executed + <-closeDone defer GinkgoRecover() - // allow other handler to execute to insert last_sequence - time.Sleep(50 * time.Millisecond) var seq string - err = getDB(). + //for seq = ""; seq == ""; { + // time.Sleep(50 * time.Millisecond) + err := getDB(). QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). Scan(&seq) - Expect(err).NotTo(HaveOccurred()) + //} Expect(seq).To(Equal(cl.LastSequence)) restoreContext() close(done) - }) + }() + } }) pie := apid.PluginsInitializedEvent{ @@ -158,27 +159,35 @@ It("should bootstrap from local DB if present", func(done Done) { + var closeDone <-chan bool + initializeContext() expectedTables := common.ChangeList{ Changes: []common.Change{common.Change{Table: "kms.company"}, common.Change{Table: "edgex.apid_cluster"}, common.Change{Table: "edgex.data_scope"}}, } - thisQuitPollingChangeServer := quitPollingChangeServer Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { if s, ok := event.(*common.Snapshot); ok { - go func() { thisQuitPollingChangeServer <- true }() - //verify that the knownTables array has been properly populated from existing DB - Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) + // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed + // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler + closeDone = changeManager.close() + go func() { + // when close done, all handlers for the first snapshot have been executed + <-closeDone + //verify that the knownTables array has been properly populated from existing DB + Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) - Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) - Expect(s.Tables).To(BeNil()) + Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) + Expect(s.Tables).To(BeNil()) - restoreContext() - close(done) + restoreContext() + close(done) + }() + } }) pie := apid.PluginsInitializedEvent{
diff --git a/changes.go b/changes.go index 59caa34..f182311 100644 --- a/changes.go +++ b/changes.go
@@ -9,16 +9,93 @@ "time" "github.com/apigee-labs/transicator/common" + "sync/atomic" ) var lastSequence string var block string = "45" +type pollChangeManager struct { + // 0 for not closed, 1 for closed + isClosed *int32 + // 0 for pollChangeWithBackoff() not launched, 1 for launched + isLaunched *int32 + quitChan chan bool +} + +func createChangeManager() *pollChangeManager { + isClosedInt := int32(0) + isLaunchedInt := int32(0) + return &pollChangeManager{ + isClosed: &isClosedInt, + quitChan: make(chan bool), + isLaunched: &isLaunchedInt, + } +} + +/* + * thread-safe close of pollChangeManager + * It marks status as closed immediately, quits backoff polling agent, and closes tokenManager + * use <- close() for blocking close + */ +func (c *pollChangeManager) close() <-chan bool { + finishChan := make(chan bool, 1) + //has been closed + if atomic.SwapInt32(c.isClosed, 1) == int32(1) { + log.Error("pollChangeManager: close() called on a closed pollChangeManager!") + go func() { + finishChan <- false + log.Debug("change manager closed") + }() + return finishChan + } + // not launched + if atomic.LoadInt32(c.isLaunched) == int32(0) { + log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!") + go func() { + tokenManager.close() + finishChan <- false + log.Debug("change manager closed") + }() + return finishChan + } + // launched + log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager") + go func() { + c.quitChan <- true + tokenManager.close() + finishChan <- true + log.Debug("change manager closed") + }() + return finishChan +} + +/* + * thread-safe pollChangeWithBackoff(), guaranteed: only one polling thread + */ + +func (c *pollChangeManager) pollChangeWithBackoff() { + // closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Error("pollChangeManager: pollChangeWithBackoff() called after closed") + return + } + // has been launched before + if atomic.SwapInt32(c.isLaunched, 1) == int32(1) { + log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before") + return + } + + go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError) + log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff") + +} + /* * Long polls the change agent with a 45 second block. Parses the response from * change agent and raises an event. Called by pollWithBackoff(). */ -func pollChangeAgent(quit chan bool) error { +func (c *pollChangeManager) pollChangeAgent(dummyQuit chan bool) error { changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI)) if err != nil { @@ -35,12 +112,16 @@ for { select { - case <-quit: - log.Info("Recevied quit signal to stop polling change server") + case <-c.quitChan: + log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager") return quitSignalError{} default: - err := getChanges(changesUri) + err := c.getChanges(changesUri) if err != nil { + if _, ok := err.(quitSignalError); ok { + log.Debug("pollChangeAgent: consuming the quit signal") + <-c.quitChan + } return err } } @@ -49,7 +130,11 @@ //TODO refactor this method more, split it up /* Make a single request to the changeserver to get a changelist */ -func getChanges(changesUri *url.URL) error { +func (c *pollChangeManager) getChanges(changesUri *url.URL) error { + // if closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + return quitSignalError{} + } log.Debug("polling...") /* Find the scopes associated with the config id */ @@ -83,10 +168,20 @@ r, err := client.Do(req) if err != nil { log.Errorf("change agent comm error: %s", err) + // if closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + return quitSignalError{} + } return err } defer r.Body.Close() + // has been closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Debugf("getChanges: changeManager has been closed") + return quitSignalError{} + } + if r.StatusCode != http.StatusOK { log.Errorf("Get changes request failed with status code: %d", r.StatusCode) switch r.StatusCode { @@ -157,11 +252,15 @@ return changesHaveNewTables(knownTables, changes.Changes) } -func handleChangeServerError(err error) { - +func (c *pollChangeManager) handleChangeServerError(err error) { + // has been closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Debugf("handleChangeServerError: changeManager has been closed") + return + } if _, ok := err.(changeServerError); ok { log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") - downloadDataSnapshot(nil) + downloadDataSnapshot(c.quitChan) } else { log.Debugf("Error connecting to changeserver: %v", err) }
diff --git a/init.go b/init.go index c9ad44f..e1b31c5 100644 --- a/init.go +++ b/init.go
@@ -37,8 +37,8 @@ apidInfo apidInstanceInfo newInstanceID bool tokenManager *tokenMan + changeManager *pollChangeManager quitPollingSnapshotServer chan bool - quitPollingChangeServer chan bool /* Set during post plugin initialization * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called @@ -77,7 +77,7 @@ //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines //also useful for testing quitPollingSnapshotServer = make(chan bool) - quitPollingChangeServer = make(chan bool) + changeManager = createChangeManager() // set up default database db, err := dataService.DB()
diff --git a/token.go b/token.go index bc6f1df..a6c118e 100644 --- a/token.go +++ b/token.go
@@ -7,6 +7,7 @@ "net/http" "net/url" "path" + "sync/atomic" "time" ) @@ -24,6 +25,7 @@ */ func createTokenManager() *tokenMan { + isClosedInt := int32(0) t := &tokenMan{ quitPollingForToken: make(chan bool, 1), @@ -32,6 +34,7 @@ invalidateTokenChan: make(chan bool), returnTokenChan: make(chan *oauthToken), invalidateDone: make(chan bool), + isClosed: &isClosedInt, } t.retrieveNewToken() @@ -42,6 +45,7 @@ type tokenMan struct { token *oauthToken + isClosed *int32 quitPollingForToken chan bool closed chan bool getTokenChan chan bool @@ -77,24 +81,42 @@ // will block until valid func (t *tokenMan) invalidateToken() { + //has been closed + if atomic.LoadInt32(t.isClosed) == int32(1) { + log.Debug("TokenManager: invalidateToken() called on closed tokenManager") + return + } log.Debug("invalidating token") t.invalidateTokenChan <- true <-t.invalidateDone } - func (t *tokenMan) getToken() *oauthToken { + //has been closed + if atomic.LoadInt32(t.isClosed) == int32(1) { + log.Debug("TokenManager: getToken() called on closed tokenManager") + return nil + } t.getTokenChan <- true return <-t.returnTokenChan } +/* + * blocking close() of tokenMan + */ + func (t *tokenMan) close() { + //has been closed + if atomic.SwapInt32(t.isClosed, 1) == int32(1) { + log.Panic("TokenManager: close() has been called before!") + return + } log.Debug("close token manager") t.quitPollingForToken <- true // sending instead of closing, to make sure it enters the t.doRefresh branch - log.Debug("token manager closed") t.closed <- true close(t.closed) + log.Debug("token manager closed") } // don't call externally. will block until success.
diff --git a/token_test.go b/token_test.go index 2cff649..49d1eb6 100644 --- a/token_test.go +++ b/token_test.go
@@ -1,5 +1,8 @@ package apidApigeeSync +/* + * Unit test of token manager + */ import ( "time" @@ -77,16 +80,16 @@ w.Write(body) })) config.Set(configProxyServerBaseURI, ts.URL) - tokenManager = createTokenManager() - token := tokenManager.getToken() + testedTokenManager := createTokenManager() + token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) Expect(token.ExpiresIn > 0).To(BeTrue()) Expect(token.ExpiresAt).To(BeTemporally(">", time.Now())) - bToken := tokenManager.getBearerToken() + bToken := testedTokenManager.getBearerToken() Expect(bToken).To(Equal(token.AccessToken)) - tokenManager.close() + testedTokenManager.close() ts.Close() }) @@ -105,16 +108,16 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - tokenManager = createTokenManager() - token := tokenManager.getToken() + testedTokenManager := createTokenManager() + token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) - tokenManager.invalidateToken() + testedTokenManager.invalidateToken() - token2 := tokenManager.getToken() + token2 := testedTokenManager.getToken() Expect(token).ToNot(Equal(token2)) Expect(token.AccessToken).ToNot(Equal(token2.AccessToken)) - tokenManager.close() + testedTokenManager.close() ts.Close() }) @@ -144,13 +147,13 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - tokenManager = createTokenManager() + testedTokenManager := createTokenManager() - tokenManager.getToken() + testedTokenManager.getToken() <-finished - tokenManager.close() + testedTokenManager.close() ts.Close() close(done) @@ -185,13 +188,13 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - tokenManager = createTokenManager() + testedTokenManager := createTokenManager() - tokenManager.getToken() - tokenManager.invalidateToken() - tokenManager.getToken() + testedTokenManager.getToken() + testedTokenManager.invalidateToken() + testedTokenManager.getToken() <-finished - tokenManager.close() + testedTokenManager.close() ts.Close() close(done) })