add new test cases, poll change agent bug fixed
diff --git a/apigee_sync.go b/apigee_sync.go index 2f99b9a..6fc1389 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -88,7 +88,7 @@ } func addHeaders(req *http.Request) { - req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken()) + req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken()) req.Header.Set("apid_instance_id", apidInfo.InstanceID) req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) @@ -104,6 +104,9 @@ type expected200Error struct { } +type authFailError struct { +} + func (an expected200Error) Error() string { return "Did not recieve OK response" } @@ -115,3 +118,7 @@ func (a changeServerError) Error() string { return a.Code } + +func (a authFailError) Error() string { + return "Authorization failed" +}
diff --git a/change_test.go b/change_test.go index d951ff4..7a69995 100644 --- a/change_test.go +++ b/change_test.go
@@ -1,29 +1,23 @@ - package apidApigeeSync - - - - import ( "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" "net/http/httptest" "net/url" - "errors" "os" + "time" ) - var _ = Describe("Change Agent", func() { - Context("Change Agent", func() { + Context("Change Agent Unit Tests", func() { handler := handler{} var createTestDb = func(sqlfile string, dbId string) common.Snapshot { - initDb(sqlfile, "./mockdb.sqlite3") - file, err := os.Open("./mockdb.sqlite3") + initDb(sqlfile, "./mockdb_change.sqlite3") + file, err := os.Open("./mockdb_change.sqlite3") if err != nil { Fail("Failed to open mock db for test") } @@ -37,9 +31,9 @@ } BeforeEach(func() { - event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid") - + event := createTestDb("./sql/init_mock_db.sql", "test_change") handler.Handle(&event) + knownTables = extractTablesFromDB(getDB()) }) var initializeContext = func() { @@ -61,6 +55,7 @@ config.Set(configProxyServerBaseURI, testServer.URL) config.Set(configSnapServerBaseURI, testServer.URL) config.Set(configChangeServerBaseURI, testServer.URL) + config.Set(configPollInterval, 1*time.Millisecond) } var restoreContext = func() { @@ -69,11 +64,11 @@ config.Set(configProxyServerBaseURI, dummyConfigValue) config.Set(configSnapServerBaseURI, dummyConfigValue) config.Set(configChangeServerBaseURI, dummyConfigValue) - + config.Set(configPollInterval, 10*time.Millisecond) } - It("test change server agent", func() { - log.Debug("test change server agent") + It("test change agent with authorization failure", func() { + log.Debug("test change agent with authorization failure") testTokenManager := &dummyTokenManager{make(chan bool)} apidTokenManager = testTokenManager apidTokenManager.start() @@ -82,77 +77,126 @@ testMock.forceAuthFail() wipeDBAferTest = true apidChangeManager.pollChangeWithBackoff() - <- testTokenManager.invalidateChan + // auth check fails + <-testTokenManager.invalidateChan log.Debug("closing") - <- apidChangeManager.close() + <-apidChangeManager.close() restoreContext() - }, 5) + }) + It("test change agent with too old snapshot", func() { + log.Debug("test change agent with too old snapshot") + testTokenManager := &dummyTokenManager{make(chan bool)} + apidTokenManager = testTokenManager + apidTokenManager.start() + testSnapshotManager := &dummySnapshotManager{make(chan bool)} + apidSnapshotManager = testSnapshotManager + initializeContext() + + testMock.passAuthCheck() + testMock.forceNewSnapshot() + wipeDBAferTest = true + apidChangeManager.pollChangeWithBackoff() + <-testSnapshotManager.downloadCalledChan + log.Debug("closing") + <-apidChangeManager.close() + restoreContext() + }) + + It("change agent should retry with authorization failure", func(done Done) { + log.Debug("change agent should retry with authorization failure") + testTokenManager := &dummyTokenManager{make(chan bool)} + apidTokenManager = testTokenManager + apidTokenManager.start() + apidSnapshotManager = &dummySnapshotManager{} + initializeContext() + testMock.forceAuthFail() + testMock.forceNoSnapshot() + wipeDBAferTest = true + + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + + if _, ok := event.(*common.ChangeList); ok { + closeDone := apidChangeManager.close() + log.Debug("closing") + go func() { + // when close done, all handlers for the first snapshot have been executed + <-closeDone + restoreContext() + close(done) + }() + + } + }) + + apidChangeManager.pollChangeWithBackoff() + // auth check fails + <-testTokenManager.invalidateChan + }) }) }) - type dummyTokenManager struct { invalidateChan chan bool - } -func (t * dummyTokenManager) getBearerToken() string { +func (t *dummyTokenManager) getBearerToken() string { return "" } -func (t * dummyTokenManager) invalidateToken() error { +func (t *dummyTokenManager) invalidateToken() error { log.Debug("invalidateToken called") testMock.passAuthCheck() t.invalidateChan <- true - return errors.New("invalidate called") -} - -func (t * dummyTokenManager) getToken() *oauthToken { return nil } -func (t * dummyTokenManager) close() { +func (t *dummyTokenManager) getToken() *oauthToken { + return nil +} + +func (t *dummyTokenManager) close() { return } -func (t * dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error { - return func(chan bool) error{ +func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error { + return func(chan bool) error { return nil } } -func (* dummyTokenManager) start() { +func (t *dummyTokenManager) start() { } type dummySnapshotManager struct { - + downloadCalledChan chan bool } -func (* dummySnapshotManager) close() <-chan bool { +func (s *dummySnapshotManager) close() <-chan bool { closeChan := make(chan bool) close(closeChan) return closeChan } -func (* dummySnapshotManager) downloadBootSnapshot() { +func (s *dummySnapshotManager) downloadBootSnapshot() { } -func (* dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) { +func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) { } -func (* dummySnapshotManager) downloadDataSnapshot(){ +func (s *dummySnapshotManager) downloadDataSnapshot() { + log.Debug("dummySnapshotManager.downloadDataSnapshot() called") + s.downloadCalledChan <- true +} + +func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) { } -func (* dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) { - -} - -func (* dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error{ +func (s *dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error { return nil -} \ No newline at end of file +}
diff --git a/changes.go b/changes.go index 3a53a79..9e8d170 100644 --- a/changes.go +++ b/changes.go
@@ -184,7 +184,10 @@ switch r.StatusCode { case http.StatusUnauthorized: err = apidTokenManager.invalidateToken() - return err + if err != nil { + return err + } + return authFailError{} case http.StatusNotModified: return nil @@ -206,7 +209,7 @@ log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") err = apiErr } - return nil + return err } return nil }
diff --git a/init.go b/init.go index 68d6375..d9b767e 100644 --- a/init.go +++ b/init.go
@@ -30,16 +30,16 @@ var ( /* All set during plugin initialization */ - log apid.LogService - config apid.ConfigService - dataService apid.DataService - events apid.EventsService - apidInfo apidInstanceInfo - newInstanceID bool - apidTokenManager tokenManager - apidChangeManager changeManager + log apid.LogService + config apid.ConfigService + dataService apid.DataService + events apid.EventsService + apidInfo apidInstanceInfo + newInstanceID bool + apidTokenManager tokenManager + apidChangeManager changeManager apidSnapshotManager snapShotManager - httpclient *http.Client + httpclient *http.Client /* Set during post plugin initialization * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called @@ -83,7 +83,7 @@ Transport: tr, Timeout: httpTimeout, CheckRedirect: func(req *http.Request, _ []*http.Request) error { - req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken()) + req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken()) return nil }, } @@ -112,14 +112,12 @@ return nil } - func createManagers() { apidSnapshotManager = createSnapShotManager() apidChangeManager = createChangeManager() apidTokenManager = createSimpleTokenManager() } - func checkForRequiredValues() error { // check for required values for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, @@ -213,7 +211,6 @@ log.Debug("start post plugin init") - apidTokenManager.start() go bootstrap()
diff --git a/listener.go b/listener.go index a1360d4..aa3cfd2 100644 --- a/listener.go +++ b/listener.go
@@ -117,20 +117,3 @@ return ok } - -func makeDataScopeFromRow(row common.Row) dataDataScope { - - ds := dataDataScope{} - - row.Get("id", &ds.ID) - row.Get("apid_cluster_id", &ds.ClusterID) - row.Get("scope", &ds.Scope) - row.Get("org", &ds.Org) - row.Get("env", &ds.Env) - row.Get("created", &ds.Created) - row.Get("created_by", &ds.CreatedBy) - row.Get("updated", &ds.Updated) - row.Get("updated_by", &ds.UpdatedBy) - - return ds -}
diff --git a/managerInterfaces.go b/managerInterfaces.go index 55ee0df..20bbf6f 100644 --- a/managerInterfaces.go +++ b/managerInterfaces.go
@@ -1,8 +1,8 @@ package apidApigeeSync import ( - "net/url" "github.com/apigee-labs/transicator/common" + "net/url" ) type tokenManager interface { @@ -26,4 +26,4 @@ type changeManager interface { close() <-chan bool pollChangeWithBackoff() -} \ No newline at end of file +}
diff --git a/mock_server.go b/mock_server.go index 328f6f1..8349131 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -95,7 +95,11 @@ } func (m *MockServer) forceNewSnapshot() { - atomic.SwapInt32(m.newSnap, 1) + atomic.StoreInt32(m.newSnap, 1) +} + +func (m *MockServer) forceNoSnapshot() { + atomic.StoreInt32(m.newSnap, 0) } func (m *MockServer) lastSequenceID() string { @@ -268,6 +272,7 @@ val := atomic.SwapInt32(m.newSnap, 0) if val > 0 { + log.Debug("MockServer: force new snapshot") w.WriteHeader(http.StatusBadRequest) apiErr := changeServerError{ Code: "SNAPSHOT_TOO_OLD", @@ -278,6 +283,8 @@ return } + log.Debug("mock server sending change list") + q := req.URL.Query() scopes := q["scope"] @@ -319,20 +326,28 @@ func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { - if atomic.LoadInt32(m.authFail) > 0 { + // force failing auth check + if atomic.LoadInt32(m.authFail) == 1 { w.WriteHeader(http.StatusUnauthorized) w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. "))) return } - auth := req.Header.Get("Authorization") + // force passing auth check + if atomic.LoadInt32(m.authFail) == 2 { + target(w, req) + return + } + + // check auth header + auth := req.Header.Get("Authorization") expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken) if auth != expectedAuth { - w.WriteHeader(http.StatusBadRequest) + w.WriteHeader(http.StatusUnauthorized) w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth))) - } else { - target(w, req) + return } + target(w, req) } }
diff --git a/token.go b/token.go index f9d9d3f..56d6676 100644 --- a/token.go +++ b/token.go
@@ -3,13 +3,13 @@ import ( "bytes" "encoding/json" + "errors" "io/ioutil" "net/http" "net/url" "path" "sync/atomic" "time" - "errors" ) var ( @@ -51,6 +51,7 @@ returnTokenChan chan *oauthToken invalidateDone chan bool } + func (t *simpleTokenManager) start() { t.retrieveNewToken() t.refreshTimer = time.After(t.token.refreshIn())