reformat code for unit tests, add a new test case, coverage increased to 75%
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index c00de6e..dd6cba4 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -55,6 +55,7 @@
log = apid.Log()
_initPlugin(apid.AllServices())
+ createManagers()
close(done)
}, 3)
diff --git a/apigee_sync.go b/apigee_sync.go
index 391355b..2f99b9a 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -27,17 +27,17 @@
snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
- changeManager.pollChangeWithBackoff()
+ apidChangeManager.pollChangeWithBackoff()
})
log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
return
}
- snapManager.downloadBootSnapshot()
- snapManager.downloadDataSnapshot()
+ apidSnapshotManager.downloadBootSnapshot()
+ apidSnapshotManager.downloadDataSnapshot()
- changeManager.pollChangeWithBackoff()
+ apidChangeManager.pollChangeWithBackoff()
}
@@ -88,7 +88,7 @@
}
func addHeaders(req *http.Request) {
- req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken())
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 463b9e4..22823a4 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -111,7 +111,7 @@
}
} else if cl, ok := event.(*common.ChangeList); ok {
- closeDone = changeManager.close()
+ closeDone = apidChangeManager.close()
// ensure that snapshot switched DB versions
Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
@@ -180,7 +180,7 @@
if s, ok := event.(*common.Snapshot); ok {
// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
- closeDone = changeManager.close()
+ closeDone = apidChangeManager.close()
go func() {
// when close done, all handlers for the first snapshot have been executed
<-closeDone
@@ -289,18 +289,19 @@
*/
It("Should be able to handle duplicate snapshot during bootstrap", func() {
initializeContext()
- tokenManager = createTokenManager()
- snapManager = createSnapShotManager()
+ apidTokenManager = createSimpleTokenManager()
+ apidTokenManager.start()
+ apidSnapshotManager = createSnapShotManager()
events.Listen(ApigeeSyncEventSelector, &handler{})
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
- snapManager.downloadSnapshot(scopes, snapshot)
- snapManager.storeBootSnapshot(snapshot)
- snapManager.storeDataSnapshot(snapshot)
+ apidSnapshotManager.downloadSnapshot(scopes, snapshot)
+ apidSnapshotManager.storeBootSnapshot(snapshot)
+ apidSnapshotManager.storeDataSnapshot(snapshot)
restoreContext()
- <-snapManager.close()
- tokenManager.close()
+ <-apidSnapshotManager.close()
+ apidTokenManager.close()
}, 3)
It("Reuse http.Client connection for multiple concurrent requests", func() {
diff --git a/change_test.go b/change_test.go
new file mode 100644
index 0000000..d951ff4
--- /dev/null
+++ b/change_test.go
@@ -0,0 +1,158 @@
+
+package apidApigeeSync
+
+
+
+
+
+import (
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ "net/http/httptest"
+ "net/url"
+ "errors"
+ "os"
+)
+
+
+var _ = Describe("Change Agent", func() {
+
+ Context("Change Agent", func() {
+ handler := handler{}
+
+ var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
+ initDb(sqlfile, "./mockdb.sqlite3")
+ file, err := os.Open("./mockdb.sqlite3")
+ if err != nil {
+ Fail("Failed to open mock db for test")
+ }
+
+ s := common.Snapshot{}
+ err = processSnapshotServerFileResponse(dbId, file, &s)
+ if err != nil {
+ Fail("Error processing test snapshots")
+ }
+ return s
+ }
+
+ BeforeEach(func() {
+ event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
+
+ handler.Handle(&event)
+ })
+
+ var initializeContext = func() {
+ testRouter = apid.API().Router()
+ testServer = httptest.NewServer(testRouter)
+
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: true,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "ert452",
+ Organization: "att",
+ Environment: "prod",
+ }
+ testMock = Mock(mockParms, testRouter)
+
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
+ }
+
+ var restoreContext = func() {
+
+ testServer.Close()
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
+
+ }
+
+ It("test change server agent", func() {
+ log.Debug("test change server agent")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ apidSnapshotManager = &dummySnapshotManager{}
+ initializeContext()
+ testMock.forceAuthFail()
+ wipeDBAferTest = true
+ apidChangeManager.pollChangeWithBackoff()
+ <- testTokenManager.invalidateChan
+ log.Debug("closing")
+ <- apidChangeManager.close()
+ restoreContext()
+ }, 5)
+
+
+ })
+})
+
+
+type dummyTokenManager struct {
+ invalidateChan chan bool
+
+}
+
+func (t * dummyTokenManager) getBearerToken() string {
+ return ""
+}
+
+func (t * dummyTokenManager) invalidateToken() error {
+ log.Debug("invalidateToken called")
+ testMock.passAuthCheck()
+ t.invalidateChan <- true
+ return errors.New("invalidate called")
+}
+
+func (t * dummyTokenManager) getToken() *oauthToken {
+ return nil
+}
+
+func (t * dummyTokenManager) close() {
+ return
+}
+
+func (t * dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
+ return func(chan bool) error{
+ return nil
+ }
+}
+
+func (* dummyTokenManager) start() {
+
+}
+
+type dummySnapshotManager struct {
+
+}
+
+func (* dummySnapshotManager) close() <-chan bool {
+ closeChan := make(chan bool)
+ close(closeChan)
+ return closeChan
+}
+
+func (* dummySnapshotManager) downloadBootSnapshot() {
+
+}
+
+func (* dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (* dummySnapshotManager) downloadDataSnapshot(){
+
+}
+
+func (* dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (* dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error{
+ return nil
+}
\ No newline at end of file
diff --git a/changes.go b/changes.go
index a3a39b5..3a53a79 100644
--- a/changes.go
+++ b/changes.go
@@ -54,8 +54,8 @@
log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
go func() {
c.quitChan <- true
- tokenManager.close()
- <-snapManager.close()
+ apidTokenManager.close()
+ <-apidSnapshotManager.close()
log.Debug("change manager closed")
finishChan <- false
}()
@@ -65,8 +65,8 @@
log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
go func() {
c.quitChan <- true
- tokenManager.close()
- <-snapManager.close()
+ apidTokenManager.close()
+ <-apidSnapshotManager.close()
log.Debug("change manager closed")
finishChan <- true
}()
@@ -183,8 +183,8 @@
log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
switch r.StatusCode {
case http.StatusUnauthorized:
- tokenManager.invalidateToken()
- return nil
+ err = apidTokenManager.invalidateToken()
+ return err
case http.StatusNotModified:
return nil
@@ -271,7 +271,7 @@
}
if c, ok := err.(changeServerError); ok {
log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
- snapManager.downloadDataSnapshot()
+ apidSnapshotManager.downloadDataSnapshot()
} else {
log.Debugf("Error connecting to changeserver: %v", err)
}
diff --git a/init.go b/init.go
index 437ce5c..68d6375 100644
--- a/init.go
+++ b/init.go
@@ -36,9 +36,9 @@
events apid.EventsService
apidInfo apidInstanceInfo
newInstanceID bool
- tokenManager *tokenMan
- changeManager *pollChangeManager
- snapManager *snapShotManager
+ apidTokenManager tokenManager
+ apidChangeManager changeManager
+ apidSnapshotManager snapShotManager
httpclient *http.Client
/* Set during post plugin initialization
@@ -83,16 +83,11 @@
Transport: tr,
Timeout: httpTimeout,
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
- req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken())
return nil
},
}
- //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
- //also useful for testing
- snapManager = createSnapShotManager()
- changeManager = createChangeManager()
-
// set up default database
db, err := dataService.DB()
if err != nil {
@@ -117,6 +112,14 @@
return nil
}
+
+func createManagers() {
+ apidSnapshotManager = createSnapShotManager()
+ apidChangeManager = createChangeManager()
+ apidTokenManager = createSimpleTokenManager()
+}
+
+
func checkForRequiredValues() error {
// check for required values
for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
@@ -137,7 +140,7 @@
log = logger
}
-/* Idempotent state initialization */
+/* initialization */
func _initPlugin(services apid.Services) error {
SetLogger(services.Log().ForModule("apigeeSync"))
log.Debug("start init")
@@ -165,6 +168,8 @@
return pluginData, err
}
+ createManagers()
+
/* This callback function will get called once all the plugins are
* initialized (not just this plugin). This is needed because,
* downloadSnapshots/changes etc have to begin to be processed only
@@ -208,8 +213,8 @@
log.Debug("start post plugin init")
- tokenManager = createTokenManager()
+ apidTokenManager.start()
go bootstrap()
events.Listen(ApigeeSyncEventSelector, &handler{})
diff --git a/managerInterfaces.go b/managerInterfaces.go
new file mode 100644
index 0000000..55ee0df
--- /dev/null
+++ b/managerInterfaces.go
@@ -0,0 +1,29 @@
+package apidApigeeSync
+
+import (
+ "net/url"
+ "github.com/apigee-labs/transicator/common"
+)
+
+type tokenManager interface {
+ getBearerToken() string
+ invalidateToken() error
+ getToken() *oauthToken
+ close()
+ getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
+ start()
+}
+
+type snapShotManager interface {
+ close() <-chan bool
+ downloadBootSnapshot()
+ storeBootSnapshot(snapshot *common.Snapshot)
+ downloadDataSnapshot()
+ storeDataSnapshot(snapshot *common.Snapshot)
+ downloadSnapshot(scopes []string, snapshot *common.Snapshot) error
+}
+
+type changeManager interface {
+ close() <-chan bool
+ pollChangeWithBackoff()
+}
\ No newline at end of file
diff --git a/mock_server.go b/mock_server.go
index dacdd58..328f6f1 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -79,6 +79,19 @@
minDeploymentID *int64
maxDeploymentID *int64
newSnap *int32
+ authFail *int32
+}
+
+func (m *MockServer) forceAuthFail() {
+ atomic.StoreInt32(m.authFail, 1)
+}
+
+func (m *MockServer) normalAuthCheck() {
+ atomic.StoreInt32(m.authFail, 0)
+}
+
+func (m *MockServer) passAuthCheck() {
+ atomic.StoreInt32(m.authFail, 2)
}
func (m *MockServer) forceNewSnapshot() {
@@ -146,6 +159,8 @@
*m.minDeploymentID = 1
m.maxDeploymentID = new(int64)
m.newSnap = new(int32)
+ m.authFail = new(int32)
+ *m.authFail = 0
initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
@@ -303,6 +318,12 @@
// enforces handler auth
func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
+
+ if atomic.LoadInt32(m.authFail) > 0 {
+ w.WriteHeader(http.StatusUnauthorized)
+ w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
+ return
+ }
auth := req.Header.Get("Authorization")
expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
diff --git a/snapshot.go b/snapshot.go
index 2389d5f..f4cb8bd 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -15,7 +15,7 @@
"time"
)
-type snapShotManager struct {
+type simpleSnapShotManager struct {
// to send quit signal to the downloading thread
quitChan chan bool
// to mark the graceful close of snapshotManager
@@ -26,10 +26,10 @@
isDownloading *int32
}
-func createSnapShotManager() *snapShotManager {
+func createSnapShotManager() *simpleSnapShotManager {
isClosedInt := int32(0)
isDownloadingInt := int32(0)
- return &snapShotManager{
+ return &simpleSnapShotManager{
quitChan: make(chan bool, 1),
finishChan: make(chan bool, 1),
isClosed: &isClosedInt,
@@ -43,7 +43,7 @@
* use <- close() for blocking close
* should only be called by pollChangeManager, because pollChangeManager is dependent on it
*/
-func (s *snapShotManager) close() <-chan bool {
+func (s *simpleSnapShotManager) close() <-chan bool {
//has been closed before
if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
log.Error("snapShotManager: close() called on a closed snapShotManager!")
@@ -63,7 +63,7 @@
}
// retrieve boot information: apid_config and apid_config_scope
-func (s *snapShotManager) downloadBootSnapshot() {
+func (s *simpleSnapShotManager) downloadBootSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
}
@@ -99,12 +99,12 @@
s.storeBootSnapshot(snapshot)
}
-func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
processSnapshot(snapshot)
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *snapShotManager) downloadDataSnapshot() {
+func (s *simpleSnapShotManager) downloadDataSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
}
@@ -132,7 +132,7 @@
s.storeDataSnapshot(snapshot)
}
-func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
knownTables = extractTablesFromSnapshot(snapshot)
_, err := dataService.DBVersion(snapshot.SnapshotInfo)
@@ -232,7 +232,7 @@
// a blocking method
// will keep retrying with backoff until success
-func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+func (s *simpleSnapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
// if closed
if atomic.LoadInt32(s.isClosed) == int32(1) {
log.Warn("Trying to download snapshot with a closed snapShotManager")
diff --git a/token.go b/token.go
index 424c8d4..f9d9d3f 100644
--- a/token.go
+++ b/token.go
@@ -9,6 +9,7 @@
"path"
"sync/atomic"
"time"
+ "errors"
)
var (
@@ -24,10 +25,10 @@
man.close()
*/
-func createTokenManager() *tokenMan {
+func createSimpleTokenManager() *simpleTokenManager {
isClosedInt := int32(0)
- t := &tokenMan{
+ t := &simpleTokenManager{
quitPollingForToken: make(chan bool, 1),
closed: make(chan bool),
getTokenChan: make(chan bool),
@@ -36,14 +37,10 @@
invalidateDone: make(chan bool),
isClosed: &isClosedInt,
}
-
- t.retrieveNewToken()
- t.refreshTimer = time.After(t.token.refreshIn())
- go t.maintainToken()
return t
}
-type tokenMan struct {
+type simpleTokenManager struct {
token *oauthToken
isClosed *int32
quitPollingForToken chan bool
@@ -54,12 +51,17 @@
returnTokenChan chan *oauthToken
invalidateDone chan bool
}
+func (t *simpleTokenManager) start() {
+ t.retrieveNewToken()
+ t.refreshTimer = time.After(t.token.refreshIn())
+ go t.maintainToken()
+}
-func (t *tokenMan) getBearerToken() string {
+func (t *simpleTokenManager) getBearerToken() string {
return t.getToken().AccessToken
}
-func (t *tokenMan) maintainToken() {
+func (t *simpleTokenManager) maintainToken() {
for {
select {
case <-t.closed:
@@ -80,18 +82,19 @@
}
// will block until valid
-func (t *tokenMan) invalidateToken() {
+func (t *simpleTokenManager) invalidateToken() error {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
- return
+ return errors.New("invalidateToken() called on closed tokenManager")
}
log.Debug("invalidating token")
t.invalidateTokenChan <- true
<-t.invalidateDone
+ return nil
}
-func (t *tokenMan) getToken() *oauthToken {
+func (t *simpleTokenManager) getToken() *oauthToken {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: getToken() called on closed tokenManager")
@@ -105,7 +108,7 @@
* blocking close() of tokenMan
*/
-func (t *tokenMan) close() {
+func (t *simpleTokenManager) close() {
//has been closed
if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
log.Panic("TokenManager: close() has been called before!")
@@ -120,7 +123,7 @@
}
// don't call externally. will block until success.
-func (t *tokenMan) retrieveNewToken() {
+func (t *simpleTokenManager) retrieveNewToken() {
log.Debug("Getting OAuth token...")
uriString := config.GetString(configProxyServerBaseURI)
@@ -133,7 +136,7 @@
pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
}
-func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
diff --git a/token_test.go b/token_test.go
index c8ec7ba..22ff425 100644
--- a/token_test.go
+++ b/token_test.go
@@ -80,7 +80,8 @@
w.Write(body)
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -108,7 +109,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -147,8 +149,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
-
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
testedTokenManager.getToken()
<-finished
@@ -188,8 +190,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
-
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
testedTokenManager.getToken()
testedTokenManager.invalidateToken()
testedTokenManager.getToken()