rewrite poll changes, rewrite test cases, fixed multiple race conditions in tests.
diff --git a/apigee_sync.go b/apigee_sync.go
index 99275f3..fcf47d1 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -26,7 +26,7 @@
snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
- go pollWithBackoff(quitPollingChangeServer, pollChangeAgent, handleChangeServerError)
+ changeManager.pollChangeWithBackoff()
})
log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
@@ -36,7 +36,7 @@
downloadBootSnapshot(nil)
downloadDataSnapshot(quitPollingSnapshotServer)
- go pollWithBackoff(quitPollingChangeServer, pollChangeAgent, handleChangeServerError)
+ changeManager.pollChangeWithBackoff()
}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 7aa1a81..579a5c3 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -6,7 +6,7 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"net/http/httptest"
- "time"
+ //"time"
)
var _ = Describe("Sync", func() {
@@ -36,7 +36,6 @@
var restoreContext = func() {
- tokenManager.close()
testServer.Close()
config.Set(configProxyServerBaseURI, dummyConfigValue)
@@ -47,7 +46,7 @@
It("should succesfully bootstrap from clean slate", func(done Done) {
log.Info("Starting sync tests...")
-
+ var closeDone <-chan bool
initializeContext()
// do not wipe DB after. Lets use it
wipeDBAferTest = false
@@ -60,7 +59,6 @@
}
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
if s, ok := event.(*common.Snapshot); ok {
Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
@@ -105,7 +103,7 @@
}
} else if cl, ok := event.(*common.ChangeList); ok {
- go func() { quitPollingChangeServer <- true }()
+ closeDone = changeManager.close()
// ensure that snapshot switched DB versions
Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
@@ -131,22 +129,25 @@
Expect(tables).To(ContainElement("kms.api_product"))
Expect(tables).To(ContainElement("kms.app"))
- events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) {
+ go func() {
+ // when close done, all handlers for the first changeList have been executed
+ <-closeDone
defer GinkgoRecover()
-
// allow other handler to execute to insert last_sequence
- time.Sleep(50 * time.Millisecond)
var seq string
- err = getDB().
+ //for seq = ""; seq == ""; {
+ // time.Sleep(50 * time.Millisecond)
+ err := getDB().
QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
Scan(&seq)
-
Expect(err).NotTo(HaveOccurred())
+ //}
Expect(seq).To(Equal(cl.LastSequence))
restoreContext()
close(done)
- })
+ }()
+
}
})
pie := apid.PluginsInitializedEvent{
@@ -158,27 +159,35 @@
It("should bootstrap from local DB if present", func(done Done) {
+ var closeDone <-chan bool
+
initializeContext()
expectedTables := common.ChangeList{
Changes: []common.Change{common.Change{Table: "kms.company"},
common.Change{Table: "edgex.apid_cluster"},
common.Change{Table: "edgex.data_scope"}},
}
- thisQuitPollingChangeServer := quitPollingChangeServer
Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
if s, ok := event.(*common.Snapshot); ok {
- go func() { thisQuitPollingChangeServer <- true }()
- //verify that the knownTables array has been properly populated from existing DB
- Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
+ // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
+ // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
+ closeDone = changeManager.close()
+ go func() {
+ // when close done, all handlers for the first snapshot have been executed
+ <-closeDone
+ //verify that the knownTables array has been properly populated from existing DB
+ Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
- Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
- Expect(s.Tables).To(BeNil())
+ Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
+ Expect(s.Tables).To(BeNil())
- restoreContext()
- close(done)
+ restoreContext()
+ close(done)
+ }()
+
}
})
pie := apid.PluginsInitializedEvent{
diff --git a/changes.go b/changes.go
index 59caa34..f182311 100644
--- a/changes.go
+++ b/changes.go
@@ -9,16 +9,93 @@
"time"
"github.com/apigee-labs/transicator/common"
+ "sync/atomic"
)
var lastSequence string
var block string = "45"
+type pollChangeManager struct {
+ // 0 for not closed, 1 for closed
+ isClosed *int32
+ // 0 for pollChangeWithBackoff() not launched, 1 for launched
+ isLaunched *int32
+ quitChan chan bool
+}
+
+func createChangeManager() *pollChangeManager {
+ isClosedInt := int32(0)
+ isLaunchedInt := int32(0)
+ return &pollChangeManager{
+ isClosed: &isClosedInt,
+ quitChan: make(chan bool),
+ isLaunched: &isLaunchedInt,
+ }
+}
+
+/*
+ * thread-safe close of pollChangeManager
+ * It marks status as closed immediately, quits backoff polling agent, and closes tokenManager
+ * use <- close() for blocking close
+ */
+func (c *pollChangeManager) close() <-chan bool {
+ finishChan := make(chan bool, 1)
+ //has been closed
+ if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
+ log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
+ go func() {
+ finishChan <- false
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+ }
+ // not launched
+ if atomic.LoadInt32(c.isLaunched) == int32(0) {
+ log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!")
+ go func() {
+ tokenManager.close()
+ finishChan <- false
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+ }
+ // launched
+ log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
+ go func() {
+ c.quitChan <- true
+ tokenManager.close()
+ finishChan <- true
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+}
+
+/*
+ * thread-safe pollChangeWithBackoff(), guaranteed: only one polling thread
+ */
+
+func (c *pollChangeManager) pollChangeWithBackoff() {
+ // closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Error("pollChangeManager: pollChangeWithBackoff() called after closed")
+ return
+ }
+ // has been launched before
+ if atomic.SwapInt32(c.isLaunched, 1) == int32(1) {
+ log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before")
+ return
+ }
+
+ go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
+ log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff")
+
+}
+
/*
* Long polls the change agent with a 45 second block. Parses the response from
* change agent and raises an event. Called by pollWithBackoff().
*/
-func pollChangeAgent(quit chan bool) error {
+func (c *pollChangeManager) pollChangeAgent(dummyQuit chan bool) error {
changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
if err != nil {
@@ -35,12 +112,16 @@
for {
select {
- case <-quit:
- log.Info("Recevied quit signal to stop polling change server")
+ case <-c.quitChan:
+ log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
return quitSignalError{}
default:
- err := getChanges(changesUri)
+ err := c.getChanges(changesUri)
if err != nil {
+ if _, ok := err.(quitSignalError); ok {
+ log.Debug("pollChangeAgent: consuming the quit signal")
+ <-c.quitChan
+ }
return err
}
}
@@ -49,7 +130,11 @@
//TODO refactor this method more, split it up
/* Make a single request to the changeserver to get a changelist */
-func getChanges(changesUri *url.URL) error {
+func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
+ // if closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ return quitSignalError{}
+ }
log.Debug("polling...")
/* Find the scopes associated with the config id */
@@ -83,10 +168,20 @@
r, err := client.Do(req)
if err != nil {
log.Errorf("change agent comm error: %s", err)
+ // if closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ return quitSignalError{}
+ }
return err
}
defer r.Body.Close()
+ // has been closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Debugf("getChanges: changeManager has been closed")
+ return quitSignalError{}
+ }
+
if r.StatusCode != http.StatusOK {
log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
switch r.StatusCode {
@@ -157,11 +252,15 @@
return changesHaveNewTables(knownTables, changes.Changes)
}
-func handleChangeServerError(err error) {
-
+func (c *pollChangeManager) handleChangeServerError(err error) {
+ // has been closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Debugf("handleChangeServerError: changeManager has been closed")
+ return
+ }
if _, ok := err.(changeServerError); ok {
log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
- downloadDataSnapshot(nil)
+ downloadDataSnapshot(c.quitChan)
} else {
log.Debugf("Error connecting to changeserver: %v", err)
}
diff --git a/init.go b/init.go
index c9ad44f..e1b31c5 100644
--- a/init.go
+++ b/init.go
@@ -37,8 +37,8 @@
apidInfo apidInstanceInfo
newInstanceID bool
tokenManager *tokenMan
+ changeManager *pollChangeManager
quitPollingSnapshotServer chan bool
- quitPollingChangeServer chan bool
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -77,7 +77,7 @@
//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
//also useful for testing
quitPollingSnapshotServer = make(chan bool)
- quitPollingChangeServer = make(chan bool)
+ changeManager = createChangeManager()
// set up default database
db, err := dataService.DB()
diff --git a/token.go b/token.go
index bc6f1df..a6c118e 100644
--- a/token.go
+++ b/token.go
@@ -7,6 +7,7 @@
"net/http"
"net/url"
"path"
+ "sync/atomic"
"time"
)
@@ -24,6 +25,7 @@
*/
func createTokenManager() *tokenMan {
+ isClosedInt := int32(0)
t := &tokenMan{
quitPollingForToken: make(chan bool, 1),
@@ -32,6 +34,7 @@
invalidateTokenChan: make(chan bool),
returnTokenChan: make(chan *oauthToken),
invalidateDone: make(chan bool),
+ isClosed: &isClosedInt,
}
t.retrieveNewToken()
@@ -42,6 +45,7 @@
type tokenMan struct {
token *oauthToken
+ isClosed *int32
quitPollingForToken chan bool
closed chan bool
getTokenChan chan bool
@@ -77,24 +81,42 @@
// will block until valid
func (t *tokenMan) invalidateToken() {
+ //has been closed
+ if atomic.LoadInt32(t.isClosed) == int32(1) {
+ log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
+ return
+ }
log.Debug("invalidating token")
t.invalidateTokenChan <- true
<-t.invalidateDone
}
-
func (t *tokenMan) getToken() *oauthToken {
+ //has been closed
+ if atomic.LoadInt32(t.isClosed) == int32(1) {
+ log.Debug("TokenManager: getToken() called on closed tokenManager")
+ return nil
+ }
t.getTokenChan <- true
return <-t.returnTokenChan
}
+/*
+ * blocking close() of tokenMan
+ */
+
func (t *tokenMan) close() {
+ //has been closed
+ if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
+ log.Panic("TokenManager: close() has been called before!")
+ return
+ }
log.Debug("close token manager")
t.quitPollingForToken <- true
// sending instead of closing, to make sure it enters the t.doRefresh branch
- log.Debug("token manager closed")
t.closed <- true
close(t.closed)
+ log.Debug("token manager closed")
}
// don't call externally. will block until success.
diff --git a/token_test.go b/token_test.go
index 2cff649..49d1eb6 100644
--- a/token_test.go
+++ b/token_test.go
@@ -1,5 +1,8 @@
package apidApigeeSync
+/*
+ * Unit test of token manager
+ */
import (
"time"
@@ -77,16 +80,16 @@
w.Write(body)
}))
config.Set(configProxyServerBaseURI, ts.URL)
- tokenManager = createTokenManager()
- token := tokenManager.getToken()
+ testedTokenManager := createTokenManager()
+ token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
Expect(token.ExpiresIn > 0).To(BeTrue())
Expect(token.ExpiresAt).To(BeTemporally(">", time.Now()))
- bToken := tokenManager.getBearerToken()
+ bToken := testedTokenManager.getBearerToken()
Expect(bToken).To(Equal(token.AccessToken))
- tokenManager.close()
+ testedTokenManager.close()
ts.Close()
})
@@ -105,16 +108,16 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- tokenManager = createTokenManager()
- token := tokenManager.getToken()
+ testedTokenManager := createTokenManager()
+ token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
- tokenManager.invalidateToken()
+ testedTokenManager.invalidateToken()
- token2 := tokenManager.getToken()
+ token2 := testedTokenManager.getToken()
Expect(token).ToNot(Equal(token2))
Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
- tokenManager.close()
+ testedTokenManager.close()
ts.Close()
})
@@ -144,13 +147,13 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- tokenManager = createTokenManager()
+ testedTokenManager := createTokenManager()
- tokenManager.getToken()
+ testedTokenManager.getToken()
<-finished
- tokenManager.close()
+ testedTokenManager.close()
ts.Close()
close(done)
@@ -185,13 +188,13 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- tokenManager = createTokenManager()
+ testedTokenManager := createTokenManager()
- tokenManager.getToken()
- tokenManager.invalidateToken()
- tokenManager.getToken()
+ testedTokenManager.getToken()
+ testedTokenManager.invalidateToken()
+ testedTokenManager.getToken()
<-finished
- tokenManager.close()
+ testedTokenManager.close()
ts.Close()
close(done)
})