reformat code for unit tests, add a new test case, coverage increased to 75%
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index c00de6e..dd6cba4 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -55,6 +55,7 @@ log = apid.Log() _initPlugin(apid.AllServices()) + createManagers() close(done) }, 3)
diff --git a/apigee_sync.go b/apigee_sync.go index 391355b..2f99b9a 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -27,17 +27,17 @@ snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { - changeManager.pollChangeWithBackoff() + apidChangeManager.pollChangeWithBackoff() }) log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) return } - snapManager.downloadBootSnapshot() - snapManager.downloadDataSnapshot() + apidSnapshotManager.downloadBootSnapshot() + apidSnapshotManager.downloadDataSnapshot() - changeManager.pollChangeWithBackoff() + apidChangeManager.pollChangeWithBackoff() } @@ -88,7 +88,7 @@ } func addHeaders(req *http.Request) { - req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) + req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken()) req.Header.Set("apid_instance_id", apidInfo.InstanceID) req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 463b9e4..22823a4 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -111,7 +111,7 @@ } } else if cl, ok := event.(*common.ChangeList); ok { - closeDone = changeManager.close() + closeDone = apidChangeManager.close() // ensure that snapshot switched DB versions Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) @@ -180,7 +180,7 @@ if s, ok := event.(*common.Snapshot); ok { // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler - closeDone = changeManager.close() + closeDone = apidChangeManager.close() go func() { // when close done, all handlers for the first snapshot have been executed <-closeDone @@ -289,18 +289,19 @@ */ It("Should be able to handle duplicate snapshot during bootstrap", func() { initializeContext() - tokenManager = createTokenManager() - snapManager = createSnapShotManager() + apidTokenManager = createSimpleTokenManager() + apidTokenManager.start() + apidSnapshotManager = createSnapShotManager() events.Listen(ApigeeSyncEventSelector, &handler{}) scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} - snapManager.downloadSnapshot(scopes, snapshot) - snapManager.storeBootSnapshot(snapshot) - snapManager.storeDataSnapshot(snapshot) + apidSnapshotManager.downloadSnapshot(scopes, snapshot) + apidSnapshotManager.storeBootSnapshot(snapshot) + apidSnapshotManager.storeDataSnapshot(snapshot) restoreContext() - <-snapManager.close() - tokenManager.close() + <-apidSnapshotManager.close() + apidTokenManager.close() }, 3) It("Reuse http.Client connection for multiple concurrent requests", func() {
diff --git a/change_test.go b/change_test.go new file mode 100644 index 0000000..d951ff4 --- /dev/null +++ b/change_test.go
@@ -0,0 +1,158 @@ + +package apidApigeeSync + + + + + +import ( + "github.com/30x/apid-core" + "github.com/apigee-labs/transicator/common" + . "github.com/onsi/ginkgo" + "net/http/httptest" + "net/url" + "errors" + "os" +) + + +var _ = Describe("Change Agent", func() { + + Context("Change Agent", func() { + handler := handler{} + + var createTestDb = func(sqlfile string, dbId string) common.Snapshot { + initDb(sqlfile, "./mockdb.sqlite3") + file, err := os.Open("./mockdb.sqlite3") + if err != nil { + Fail("Failed to open mock db for test") + } + + s := common.Snapshot{} + err = processSnapshotServerFileResponse(dbId, file, &s) + if err != nil { + Fail("Error processing test snapshots") + } + return s + } + + BeforeEach(func() { + event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid") + + handler.Handle(&event) + }) + + var initializeContext = func() { + testRouter = apid.API().Router() + testServer = httptest.NewServer(testRouter) + + // set up mock server + mockParms := MockParms{ + ReliableAPI: true, + ClusterID: config.GetString(configApidClusterId), + TokenKey: config.GetString(configConsumerKey), + TokenSecret: config.GetString(configConsumerSecret), + Scope: "ert452", + Organization: "att", + Environment: "prod", + } + testMock = Mock(mockParms, testRouter) + + config.Set(configProxyServerBaseURI, testServer.URL) + config.Set(configSnapServerBaseURI, testServer.URL) + config.Set(configChangeServerBaseURI, testServer.URL) + } + + var restoreContext = func() { + + testServer.Close() + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) + + } + + It("test change server agent", func() { + log.Debug("test change server agent") + testTokenManager := &dummyTokenManager{make(chan bool)} + apidTokenManager = testTokenManager + apidTokenManager.start() + apidSnapshotManager = &dummySnapshotManager{} + initializeContext() + testMock.forceAuthFail() + wipeDBAferTest = true + apidChangeManager.pollChangeWithBackoff() + <- testTokenManager.invalidateChan + log.Debug("closing") + <- apidChangeManager.close() + restoreContext() + }, 5) + + + }) +}) + + +type dummyTokenManager struct { + invalidateChan chan bool + +} + +func (t * dummyTokenManager) getBearerToken() string { + return "" +} + +func (t * dummyTokenManager) invalidateToken() error { + log.Debug("invalidateToken called") + testMock.passAuthCheck() + t.invalidateChan <- true + return errors.New("invalidate called") +} + +func (t * dummyTokenManager) getToken() *oauthToken { + return nil +} + +func (t * dummyTokenManager) close() { + return +} + +func (t * dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error { + return func(chan bool) error{ + return nil + } +} + +func (* dummyTokenManager) start() { + +} + +type dummySnapshotManager struct { + +} + +func (* dummySnapshotManager) close() <-chan bool { + closeChan := make(chan bool) + close(closeChan) + return closeChan +} + +func (* dummySnapshotManager) downloadBootSnapshot() { + +} + +func (* dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) { + +} + +func (* dummySnapshotManager) downloadDataSnapshot(){ + +} + +func (* dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) { + +} + +func (* dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error{ + return nil +} \ No newline at end of file
diff --git a/changes.go b/changes.go index a3a39b5..3a53a79 100644 --- a/changes.go +++ b/changes.go
@@ -54,8 +54,8 @@ log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!") go func() { c.quitChan <- true - tokenManager.close() - <-snapManager.close() + apidTokenManager.close() + <-apidSnapshotManager.close() log.Debug("change manager closed") finishChan <- false }() @@ -65,8 +65,8 @@ log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager") go func() { c.quitChan <- true - tokenManager.close() - <-snapManager.close() + apidTokenManager.close() + <-apidSnapshotManager.close() log.Debug("change manager closed") finishChan <- true }() @@ -183,8 +183,8 @@ log.Errorf("Get changes request failed with status code: %d", r.StatusCode) switch r.StatusCode { case http.StatusUnauthorized: - tokenManager.invalidateToken() - return nil + err = apidTokenManager.invalidateToken() + return err case http.StatusNotModified: return nil @@ -271,7 +271,7 @@ } if c, ok := err.(changeServerError); ok { log.Debugf("%s. Fetch a new snapshot to sync...", c.Code) - snapManager.downloadDataSnapshot() + apidSnapshotManager.downloadDataSnapshot() } else { log.Debugf("Error connecting to changeserver: %v", err) }
diff --git a/init.go b/init.go index 437ce5c..68d6375 100644 --- a/init.go +++ b/init.go
@@ -36,9 +36,9 @@ events apid.EventsService apidInfo apidInstanceInfo newInstanceID bool - tokenManager *tokenMan - changeManager *pollChangeManager - snapManager *snapShotManager + apidTokenManager tokenManager + apidChangeManager changeManager + apidSnapshotManager snapShotManager httpclient *http.Client /* Set during post plugin initialization @@ -83,16 +83,11 @@ Transport: tr, Timeout: httpTimeout, CheckRedirect: func(req *http.Request, _ []*http.Request) error { - req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) + req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken()) return nil }, } - //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines - //also useful for testing - snapManager = createSnapShotManager() - changeManager = createChangeManager() - // set up default database db, err := dataService.DB() if err != nil { @@ -117,6 +112,14 @@ return nil } + +func createManagers() { + apidSnapshotManager = createSnapShotManager() + apidChangeManager = createChangeManager() + apidTokenManager = createSimpleTokenManager() +} + + func checkForRequiredValues() error { // check for required values for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, @@ -137,7 +140,7 @@ log = logger } -/* Idempotent state initialization */ +/* initialization */ func _initPlugin(services apid.Services) error { SetLogger(services.Log().ForModule("apigeeSync")) log.Debug("start init") @@ -165,6 +168,8 @@ return pluginData, err } + createManagers() + /* This callback function will get called once all the plugins are * initialized (not just this plugin). This is needed because, * downloadSnapshots/changes etc have to begin to be processed only @@ -208,8 +213,8 @@ log.Debug("start post plugin init") - tokenManager = createTokenManager() + apidTokenManager.start() go bootstrap() events.Listen(ApigeeSyncEventSelector, &handler{})
diff --git a/managerInterfaces.go b/managerInterfaces.go new file mode 100644 index 0000000..55ee0df --- /dev/null +++ b/managerInterfaces.go
@@ -0,0 +1,29 @@ +package apidApigeeSync + +import ( + "net/url" + "github.com/apigee-labs/transicator/common" +) + +type tokenManager interface { + getBearerToken() string + invalidateToken() error + getToken() *oauthToken + close() + getRetrieveNewTokenClosure(*url.URL) func(chan bool) error + start() +} + +type snapShotManager interface { + close() <-chan bool + downloadBootSnapshot() + storeBootSnapshot(snapshot *common.Snapshot) + downloadDataSnapshot() + storeDataSnapshot(snapshot *common.Snapshot) + downloadSnapshot(scopes []string, snapshot *common.Snapshot) error +} + +type changeManager interface { + close() <-chan bool + pollChangeWithBackoff() +} \ No newline at end of file
diff --git a/mock_server.go b/mock_server.go index dacdd58..328f6f1 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -79,6 +79,19 @@ minDeploymentID *int64 maxDeploymentID *int64 newSnap *int32 + authFail *int32 +} + +func (m *MockServer) forceAuthFail() { + atomic.StoreInt32(m.authFail, 1) +} + +func (m *MockServer) normalAuthCheck() { + atomic.StoreInt32(m.authFail, 0) +} + +func (m *MockServer) passAuthCheck() { + atomic.StoreInt32(m.authFail, 2) } func (m *MockServer) forceNewSnapshot() { @@ -146,6 +159,8 @@ *m.minDeploymentID = 1 m.maxDeploymentID = new(int64) m.newSnap = new(int32) + m.authFail = new(int32) + *m.authFail = 0 initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3") initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3") @@ -303,6 +318,12 @@ // enforces handler auth func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { + + if atomic.LoadInt32(m.authFail) > 0 { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. "))) + return + } auth := req.Header.Get("Authorization") expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
diff --git a/snapshot.go b/snapshot.go index 2389d5f..f4cb8bd 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -15,7 +15,7 @@ "time" ) -type snapShotManager struct { +type simpleSnapShotManager struct { // to send quit signal to the downloading thread quitChan chan bool // to mark the graceful close of snapshotManager @@ -26,10 +26,10 @@ isDownloading *int32 } -func createSnapShotManager() *snapShotManager { +func createSnapShotManager() *simpleSnapShotManager { isClosedInt := int32(0) isDownloadingInt := int32(0) - return &snapShotManager{ + return &simpleSnapShotManager{ quitChan: make(chan bool, 1), finishChan: make(chan bool, 1), isClosed: &isClosedInt, @@ -43,7 +43,7 @@ * use <- close() for blocking close * should only be called by pollChangeManager, because pollChangeManager is dependent on it */ -func (s *snapShotManager) close() <-chan bool { +func (s *simpleSnapShotManager) close() <-chan bool { //has been closed before if atomic.SwapInt32(s.isClosed, 1) == int32(1) { log.Error("snapShotManager: close() called on a closed snapShotManager!") @@ -63,7 +63,7 @@ } // retrieve boot information: apid_config and apid_config_scope -func (s *snapShotManager) downloadBootSnapshot() { +func (s *simpleSnapShotManager) downloadBootSnapshot() { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!") } @@ -99,12 +99,12 @@ s.storeBootSnapshot(snapshot) } -func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) { +func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) { processSnapshot(snapshot) } // use the scope IDs from the boot snapshot to get all the data associated with the scopes -func (s *snapShotManager) downloadDataSnapshot() { +func (s *simpleSnapShotManager) downloadDataSnapshot() { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!") } @@ -132,7 +132,7 @@ s.storeDataSnapshot(snapshot) } -func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { +func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { knownTables = extractTablesFromSnapshot(snapshot) _, err := dataService.DBVersion(snapshot.SnapshotInfo) @@ -232,7 +232,7 @@ // a blocking method // will keep retrying with backoff until success -func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error { +func (s *simpleSnapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error { // if closed if atomic.LoadInt32(s.isClosed) == int32(1) { log.Warn("Trying to download snapshot with a closed snapShotManager")
diff --git a/token.go b/token.go index 424c8d4..f9d9d3f 100644 --- a/token.go +++ b/token.go
@@ -9,6 +9,7 @@ "path" "sync/atomic" "time" + "errors" ) var ( @@ -24,10 +25,10 @@ man.close() */ -func createTokenManager() *tokenMan { +func createSimpleTokenManager() *simpleTokenManager { isClosedInt := int32(0) - t := &tokenMan{ + t := &simpleTokenManager{ quitPollingForToken: make(chan bool, 1), closed: make(chan bool), getTokenChan: make(chan bool), @@ -36,14 +37,10 @@ invalidateDone: make(chan bool), isClosed: &isClosedInt, } - - t.retrieveNewToken() - t.refreshTimer = time.After(t.token.refreshIn()) - go t.maintainToken() return t } -type tokenMan struct { +type simpleTokenManager struct { token *oauthToken isClosed *int32 quitPollingForToken chan bool @@ -54,12 +51,17 @@ returnTokenChan chan *oauthToken invalidateDone chan bool } +func (t *simpleTokenManager) start() { + t.retrieveNewToken() + t.refreshTimer = time.After(t.token.refreshIn()) + go t.maintainToken() +} -func (t *tokenMan) getBearerToken() string { +func (t *simpleTokenManager) getBearerToken() string { return t.getToken().AccessToken } -func (t *tokenMan) maintainToken() { +func (t *simpleTokenManager) maintainToken() { for { select { case <-t.closed: @@ -80,18 +82,19 @@ } // will block until valid -func (t *tokenMan) invalidateToken() { +func (t *simpleTokenManager) invalidateToken() error { //has been closed if atomic.LoadInt32(t.isClosed) == int32(1) { log.Debug("TokenManager: invalidateToken() called on closed tokenManager") - return + return errors.New("invalidateToken() called on closed tokenManager") } log.Debug("invalidating token") t.invalidateTokenChan <- true <-t.invalidateDone + return nil } -func (t *tokenMan) getToken() *oauthToken { +func (t *simpleTokenManager) getToken() *oauthToken { //has been closed if atomic.LoadInt32(t.isClosed) == int32(1) { log.Debug("TokenManager: getToken() called on closed tokenManager") @@ -105,7 +108,7 @@ * blocking close() of tokenMan */ -func (t *tokenMan) close() { +func (t *simpleTokenManager) close() { //has been closed if atomic.SwapInt32(t.isClosed, 1) == int32(1) { log.Panic("TokenManager: close() has been called before!") @@ -120,7 +123,7 @@ } // don't call externally. will block until success. -func (t *tokenMan) retrieveNewToken() { +func (t *simpleTokenManager) retrieveNewToken() { log.Debug("Getting OAuth token...") uriString := config.GetString(configProxyServerBaseURI) @@ -133,7 +136,7 @@ pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) }) } -func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error { +func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error { return func(_ chan bool) error { form := url.Values{} form.Set("grant_type", "client_credentials")
diff --git a/token_test.go b/token_test.go index c8ec7ba..22ff425 100644 --- a/token_test.go +++ b/token_test.go
@@ -80,7 +80,8 @@ w.Write(body) })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createTokenManager() + testedTokenManager := createSimpleTokenManager() + testedTokenManager.start() token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -108,7 +109,8 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createTokenManager() + testedTokenManager := createSimpleTokenManager() + testedTokenManager.start() token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -147,8 +149,8 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createTokenManager() - + testedTokenManager := createSimpleTokenManager() + testedTokenManager.start() testedTokenManager.getToken() <-finished @@ -188,8 +190,8 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createTokenManager() - + testedTokenManager := createSimpleTokenManager() + testedTokenManager.start() testedTokenManager.getToken() testedTokenManager.invalidateToken() testedTokenManager.getToken()