add new test cases, poll change agent bug fixed
diff --git a/apigee_sync.go b/apigee_sync.go
index 2f99b9a..6fc1389 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -88,7 +88,7 @@
}
func addHeaders(req *http.Request) {
- req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -104,6 +104,9 @@
type expected200Error struct {
}
+type authFailError struct {
+}
+
func (an expected200Error) Error() string {
return "Did not recieve OK response"
}
@@ -115,3 +118,7 @@
func (a changeServerError) Error() string {
return a.Code
}
+
+func (a authFailError) Error() string {
+ return "Authorization failed"
+}
diff --git a/change_test.go b/change_test.go
index d951ff4..7a69995 100644
--- a/change_test.go
+++ b/change_test.go
@@ -1,29 +1,23 @@
-
package apidApigeeSync
-
-
-
-
import (
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
"net/http/httptest"
"net/url"
- "errors"
"os"
+ "time"
)
-
var _ = Describe("Change Agent", func() {
- Context("Change Agent", func() {
+ Context("Change Agent Unit Tests", func() {
handler := handler{}
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
- initDb(sqlfile, "./mockdb.sqlite3")
- file, err := os.Open("./mockdb.sqlite3")
+ initDb(sqlfile, "./mockdb_change.sqlite3")
+ file, err := os.Open("./mockdb_change.sqlite3")
if err != nil {
Fail("Failed to open mock db for test")
}
@@ -37,9 +31,9 @@
}
BeforeEach(func() {
- event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
+ event := createTestDb("./sql/init_mock_db.sql", "test_change")
handler.Handle(&event)
+ knownTables = extractTablesFromDB(getDB())
})
var initializeContext = func() {
@@ -61,6 +55,7 @@
config.Set(configProxyServerBaseURI, testServer.URL)
config.Set(configSnapServerBaseURI, testServer.URL)
config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configPollInterval, 1*time.Millisecond)
}
var restoreContext = func() {
@@ -69,11 +64,11 @@
config.Set(configProxyServerBaseURI, dummyConfigValue)
config.Set(configSnapServerBaseURI, dummyConfigValue)
config.Set(configChangeServerBaseURI, dummyConfigValue)
-
+ config.Set(configPollInterval, 10*time.Millisecond)
}
- It("test change server agent", func() {
- log.Debug("test change server agent")
+ It("test change agent with authorization failure", func() {
+ log.Debug("test change agent with authorization failure")
testTokenManager := &dummyTokenManager{make(chan bool)}
apidTokenManager = testTokenManager
apidTokenManager.start()
@@ -82,77 +77,126 @@
testMock.forceAuthFail()
wipeDBAferTest = true
apidChangeManager.pollChangeWithBackoff()
- <- testTokenManager.invalidateChan
+ // auth check fails
+ <-testTokenManager.invalidateChan
log.Debug("closing")
- <- apidChangeManager.close()
+ <-apidChangeManager.close()
restoreContext()
- }, 5)
+ })
+ It("test change agent with too old snapshot", func() {
+ log.Debug("test change agent with too old snapshot")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ testSnapshotManager := &dummySnapshotManager{make(chan bool)}
+ apidSnapshotManager = testSnapshotManager
+ initializeContext()
+
+ testMock.passAuthCheck()
+ testMock.forceNewSnapshot()
+ wipeDBAferTest = true
+ apidChangeManager.pollChangeWithBackoff()
+ <-testSnapshotManager.downloadCalledChan
+ log.Debug("closing")
+ <-apidChangeManager.close()
+ restoreContext()
+ })
+
+ It("change agent should retry with authorization failure", func(done Done) {
+ log.Debug("change agent should retry with authorization failure")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ apidSnapshotManager = &dummySnapshotManager{}
+ initializeContext()
+ testMock.forceAuthFail()
+ testMock.forceNoSnapshot()
+ wipeDBAferTest = true
+
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+ if _, ok := event.(*common.ChangeList); ok {
+ closeDone := apidChangeManager.close()
+ log.Debug("closing")
+ go func() {
+ // when close done, all handlers for the first snapshot have been executed
+ <-closeDone
+ restoreContext()
+ close(done)
+ }()
+
+ }
+ })
+
+ apidChangeManager.pollChangeWithBackoff()
+ // auth check fails
+ <-testTokenManager.invalidateChan
+ })
})
})
-
type dummyTokenManager struct {
invalidateChan chan bool
-
}
-func (t * dummyTokenManager) getBearerToken() string {
+func (t *dummyTokenManager) getBearerToken() string {
return ""
}
-func (t * dummyTokenManager) invalidateToken() error {
+func (t *dummyTokenManager) invalidateToken() error {
log.Debug("invalidateToken called")
testMock.passAuthCheck()
t.invalidateChan <- true
- return errors.New("invalidate called")
-}
-
-func (t * dummyTokenManager) getToken() *oauthToken {
return nil
}
-func (t * dummyTokenManager) close() {
+func (t *dummyTokenManager) getToken() *oauthToken {
+ return nil
+}
+
+func (t *dummyTokenManager) close() {
return
}
-func (t * dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
- return func(chan bool) error{
+func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
+ return func(chan bool) error {
return nil
}
}
-func (* dummyTokenManager) start() {
+func (t *dummyTokenManager) start() {
}
type dummySnapshotManager struct {
-
+ downloadCalledChan chan bool
}
-func (* dummySnapshotManager) close() <-chan bool {
+func (s *dummySnapshotManager) close() <-chan bool {
closeChan := make(chan bool)
close(closeChan)
return closeChan
}
-func (* dummySnapshotManager) downloadBootSnapshot() {
+func (s *dummySnapshotManager) downloadBootSnapshot() {
}
-func (* dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
}
-func (* dummySnapshotManager) downloadDataSnapshot(){
+func (s *dummySnapshotManager) downloadDataSnapshot() {
+ log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+ s.downloadCalledChan <- true
+}
+
+func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
}
-func (* dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (* dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error{
+func (s *dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
return nil
-}
\ No newline at end of file
+}
diff --git a/changes.go b/changes.go
index 3a53a79..9e8d170 100644
--- a/changes.go
+++ b/changes.go
@@ -184,7 +184,10 @@
switch r.StatusCode {
case http.StatusUnauthorized:
err = apidTokenManager.invalidateToken()
- return err
+ if err != nil {
+ return err
+ }
+ return authFailError{}
case http.StatusNotModified:
return nil
@@ -206,7 +209,7 @@
log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
err = apiErr
}
- return nil
+ return err
}
return nil
}
diff --git a/init.go b/init.go
index 68d6375..d9b767e 100644
--- a/init.go
+++ b/init.go
@@ -30,16 +30,16 @@
var (
/* All set during plugin initialization */
- log apid.LogService
- config apid.ConfigService
- dataService apid.DataService
- events apid.EventsService
- apidInfo apidInstanceInfo
- newInstanceID bool
- apidTokenManager tokenManager
- apidChangeManager changeManager
+ log apid.LogService
+ config apid.ConfigService
+ dataService apid.DataService
+ events apid.EventsService
+ apidInfo apidInstanceInfo
+ newInstanceID bool
+ apidTokenManager tokenManager
+ apidChangeManager changeManager
apidSnapshotManager snapShotManager
- httpclient *http.Client
+ httpclient *http.Client
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -83,7 +83,7 @@
Transport: tr,
Timeout: httpTimeout,
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
- req.Header.Set("Authorization", "Bearer "+ apidTokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
return nil
},
}
@@ -112,14 +112,12 @@
return nil
}
-
func createManagers() {
apidSnapshotManager = createSnapShotManager()
apidChangeManager = createChangeManager()
apidTokenManager = createSimpleTokenManager()
}
-
func checkForRequiredValues() error {
// check for required values
for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
@@ -213,7 +211,6 @@
log.Debug("start post plugin init")
-
apidTokenManager.start()
go bootstrap()
diff --git a/listener.go b/listener.go
index a1360d4..aa3cfd2 100644
--- a/listener.go
+++ b/listener.go
@@ -117,20 +117,3 @@
return ok
}
-
-func makeDataScopeFromRow(row common.Row) dataDataScope {
-
- ds := dataDataScope{}
-
- row.Get("id", &ds.ID)
- row.Get("apid_cluster_id", &ds.ClusterID)
- row.Get("scope", &ds.Scope)
- row.Get("org", &ds.Org)
- row.Get("env", &ds.Env)
- row.Get("created", &ds.Created)
- row.Get("created_by", &ds.CreatedBy)
- row.Get("updated", &ds.Updated)
- row.Get("updated_by", &ds.UpdatedBy)
-
- return ds
-}
diff --git a/managerInterfaces.go b/managerInterfaces.go
index 55ee0df..20bbf6f 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -1,8 +1,8 @@
package apidApigeeSync
import (
- "net/url"
"github.com/apigee-labs/transicator/common"
+ "net/url"
)
type tokenManager interface {
@@ -26,4 +26,4 @@
type changeManager interface {
close() <-chan bool
pollChangeWithBackoff()
-}
\ No newline at end of file
+}
diff --git a/mock_server.go b/mock_server.go
index 328f6f1..8349131 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -95,7 +95,11 @@
}
func (m *MockServer) forceNewSnapshot() {
- atomic.SwapInt32(m.newSnap, 1)
+ atomic.StoreInt32(m.newSnap, 1)
+}
+
+func (m *MockServer) forceNoSnapshot() {
+ atomic.StoreInt32(m.newSnap, 0)
}
func (m *MockServer) lastSequenceID() string {
@@ -268,6 +272,7 @@
val := atomic.SwapInt32(m.newSnap, 0)
if val > 0 {
+ log.Debug("MockServer: force new snapshot")
w.WriteHeader(http.StatusBadRequest)
apiErr := changeServerError{
Code: "SNAPSHOT_TOO_OLD",
@@ -278,6 +283,8 @@
return
}
+ log.Debug("mock server sending change list")
+
q := req.URL.Query()
scopes := q["scope"]
@@ -319,20 +326,28 @@
func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
- if atomic.LoadInt32(m.authFail) > 0 {
+ // force failing auth check
+ if atomic.LoadInt32(m.authFail) == 1 {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
return
}
- auth := req.Header.Get("Authorization")
+ // force passing auth check
+ if atomic.LoadInt32(m.authFail) == 2 {
+ target(w, req)
+ return
+ }
+
+ // check auth header
+ auth := req.Header.Get("Authorization")
expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
if auth != expectedAuth {
- w.WriteHeader(http.StatusBadRequest)
+ w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
- } else {
- target(w, req)
+ return
}
+ target(w, req)
}
}
diff --git a/token.go b/token.go
index f9d9d3f..56d6676 100644
--- a/token.go
+++ b/token.go
@@ -3,13 +3,13 @@
import (
"bytes"
"encoding/json"
+ "errors"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync/atomic"
"time"
- "errors"
)
var (
@@ -51,6 +51,7 @@
returnTokenChan chan *oauthToken
invalidateDone chan bool
}
+
func (t *simpleTokenManager) start() {
t.retrieveNewToken()
t.refreshTimer = time.After(t.token.refreshIn())