Merge pull request #33 from 30x/refactor
Refactor -> haoming-refactor
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index a55410e..9703cdf 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -23,6 +23,8 @@
wipeDBAferTest bool
)
+const dummyConfigValue string = "placeholder"
+
var _ = BeforeSuite(func(){
wipeDBAferTest = true
})
@@ -37,12 +39,9 @@
Expect(err).NotTo(HaveOccurred())
config.Set("local_storage_path", tmpDir)
- testRouter = apid.API().Router()
- testServer = httptest.NewServer(testRouter)
-
- config.Set(configProxyServerBaseURI, testServer.URL)
- config.Set(configSnapServerBaseURI, testServer.URL)
- config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
config.Set(configSnapshotProtocol, "json")
config.Set(configPollInterval, 10*time.Millisecond)
@@ -54,20 +53,7 @@
block = "0"
log = apid.Log()
- // set up mock server
- mockParms := MockParms{
- ReliableAPI: false,
- ClusterID: config.GetString(configApidClusterId),
- TokenKey: config.GetString(configConsumerKey),
- TokenSecret: config.GetString(configConsumerSecret),
- Scope: "ert452",
- Organization: "att",
- Environment: "prod",
- }
- testMock = Mock(mockParms, testRouter)
-
_initPlugin(apid.AllServices())
- tokenManager = nil
close(done)
})
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index cebe7cc..7f25087 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -6,214 +6,243 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"time"
+ "net/http/httptest"
)
-var _ = Describe("listener", func() {
+var _ = Describe("Sync", func() {
- It("should succesfully bootstrap from clean slate", func(done Done) {
- log.Info("Starting sync tests...")
-
- // do not wipe DB after. Lets use it
- wipeDBAferTest = false
- var lastSnapshot *common.Snapshot
+ Context("Sync", func() {
- expectedSnapshotTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms.company"},
- common.Change{Table: "edgex.apid_cluster"},
- common.Change{Table: "edgex.data_scope"}},
+ var initializeContext = func() {
+ testRouter = apid.API().Router()
+ testServer = httptest.NewServer(testRouter)
+
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: true,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "ert452",
+ Organization: "att",
+ Environment: "prod",
+ }
+ testMock = Mock(mockParms, testRouter)
+
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
}
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ var restoreContext = func() {
- if s, ok := event.(*common.Snapshot); ok {
+ tokenManager.close()
+ testServer.Close()
- Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
- //add apid_cluster and data_scope since those would present if this were a real scenario
- knownTables["kms.app_credential"] = true
- knownTables["kms.app_credential_apiproduct_mapper"] = true
- knownTables["kms.developer"] = true
- knownTables["kms.company_developer"] = true
- knownTables["kms.api_product"] = true
- knownTables["kms.app"] = true
+ }
- lastSnapshot = s
+ It("should succesfully bootstrap from clean slate", func(done Done) {
+ log.Info("Starting sync tests...")
- for _, t := range s.Tables {
- switch t.Name {
+ initializeContext()
+ // do not wipe DB after. Lets use it
+ wipeDBAferTest = false
+ var lastSnapshot *common.Snapshot
- case "edgex.apid_cluster":
- Expect(t.Rows).To(HaveLen(1))
- r := t.Rows[0]
- var id string
- r.Get("id", &id)
- Expect(id).To(Equal("bootstrap"))
+ expectedSnapshotTables := common.ChangeList{
+ Changes: []common.Change{common.Change{Table: "kms.company"},
+ common.Change{Table: "edgex.apid_cluster"},
+ common.Change{Table: "edgex.data_scope"}},
+ }
- case "edgex.data_scope":
- Expect(t.Rows).To(HaveLen(2))
- r := t.Rows[1] // get the non-cluster row
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- var id, clusterID, env, org, scope string
- r.Get("id", &id)
- r.Get("apid_cluster_id", &clusterID)
- r.Get("env", &env)
- r.Get("org", &org)
- r.Get("scope", &scope)
+ if s, ok := event.(*common.Snapshot); ok {
- Expect(id).To(Equal("ert452"))
- Expect(scope).To(Equal("ert452"))
- Expect(clusterID).To(Equal("bootstrap"))
- Expect(env).To(Equal("prod"))
- Expect(org).To(Equal("att"))
+ Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
+
+ //add apid_cluster and data_scope since those would present if this were a real scenario
+ knownTables["kms.app_credential"] = true
+ knownTables["kms.app_credential_apiproduct_mapper"] = true
+ knownTables["kms.developer"] = true
+ knownTables["kms.company_developer"] = true
+ knownTables["kms.api_product"] = true
+ knownTables["kms.app"] = true
+
+ lastSnapshot = s
+
+ for _, t := range s.Tables {
+ switch t.Name {
+
+ case "edgex.apid_cluster":
+ Expect(t.Rows).To(HaveLen(1))
+ r := t.Rows[0]
+ var id string
+ r.Get("id", &id)
+ Expect(id).To(Equal("bootstrap"))
+
+ case "edgex.data_scope":
+ Expect(t.Rows).To(HaveLen(2))
+ r := t.Rows[1] // get the non-cluster row
+
+ var id, clusterID, env, org, scope string
+ r.Get("id", &id)
+ r.Get("apid_cluster_id", &clusterID)
+ r.Get("env", &env)
+ r.Get("org", &org)
+ r.Get("scope", &scope)
+
+ Expect(id).To(Equal("ert452"))
+ Expect(scope).To(Equal("ert452"))
+ Expect(clusterID).To(Equal("bootstrap"))
+ Expect(env).To(Equal("prod"))
+ Expect(org).To(Equal("att"))
+ }
}
- }
- } else if cl, ok := event.(*common.ChangeList); ok {
- go func(){quitPollingChangeServer <- true}()
- // ensure that snapshot switched DB versions
- Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
- expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
- Expect(getDB() == expectedDB).Should(BeTrue())
-
- Expect(cl.Changes).To(HaveLen(6))
-
- var tables []string
- for _, c := range cl.Changes {
- tables = append(tables, c.Table)
- Expect(c.NewRow).ToNot(BeNil())
-
- var tenantID string
- c.NewRow.Get("tenant_id", &tenantID)
- Expect(tenantID).To(Equal("ert452"))
- }
-
- Expect(tables).To(ContainElement("kms.app_credential"))
- Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
- Expect(tables).To(ContainElement("kms.developer"))
- Expect(tables).To(ContainElement("kms.company_developer"))
- Expect(tables).To(ContainElement("kms.api_product"))
- Expect(tables).To(ContainElement("kms.app"))
-
- events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) {
- defer GinkgoRecover()
-
- // allow other handler to execute to insert last_sequence
- time.Sleep(50 * time.Millisecond)
- var seq string
- err = getDB().
- QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
- Scan(&seq)
-
+ } else if cl, ok := event.(*common.ChangeList); ok {
+ go func(){quitPollingChangeServer <- true}()
+ // ensure that snapshot switched DB versions
+ Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
+ expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
Expect(err).NotTo(HaveOccurred())
- Expect(seq).To(Equal(cl.LastSequence))
+ Expect(getDB() == expectedDB).Should(BeTrue())
- tokenManager.close()
+ Expect(cl.Changes).To(HaveLen(6))
+
+ var tables []string
+ for _, c := range cl.Changes {
+ tables = append(tables, c.Table)
+ Expect(c.NewRow).ToNot(BeNil())
+
+ var tenantID string
+ c.NewRow.Get("tenant_id", &tenantID)
+ Expect(tenantID).To(Equal("ert452"))
+ }
+
+ Expect(tables).To(ContainElement("kms.app_credential"))
+ Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
+ Expect(tables).To(ContainElement("kms.developer"))
+ Expect(tables).To(ContainElement("kms.company_developer"))
+ Expect(tables).To(ContainElement("kms.api_product"))
+ Expect(tables).To(ContainElement("kms.app"))
+
+ events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) {
+ defer GinkgoRecover()
+
+ // allow other handler to execute to insert last_sequence
+ time.Sleep(50 * time.Millisecond)
+ var seq string
+ err = getDB().
+ QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
+ Scan(&seq)
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(seq).To(Equal(cl.LastSequence))
+
+ restoreContext()
+ close(done)
+ })
+ }
+ })
+ pie := apid.PluginsInitializedEvent{
+ Description: "plugins initialized",
+ }
+ pie.Plugins = append(pie.Plugins, pluginData)
+ postInitPlugins(pie)
+ }, 3)
+
+ It("should bootstrap from local DB if present", func(done Done) {
+
+ initializeContext()
+ expectedTables := common.ChangeList{
+ Changes: []common.Change{common.Change{Table: "kms.company"},
+ common.Change{Table: "edgex.apid_cluster"},
+ common.Change{Table: "edgex.data_scope"}},
+ }
+
+ Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
+
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+ if s, ok := event.(*common.Snapshot); ok {
+ go func(){quitPollingChangeServer <- true}()
+ //verify that the knownTables array has been properly populated from existing DB
+ Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
+
+ Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
+ Expect(s.Tables).To(BeNil())
+
+ restoreContext()
close(done)
- })
+ }
+ })
+ pie := apid.PluginsInitializedEvent{
+ Description: "plugins initialized",
}
+ pie.Plugins = append(pie.Plugins, pluginData)
+ postInitPlugins(pie)
+
+ }, 3)
+
+ It("should correctly identify non-proper subsets with respect to maps", func() {
+
+ //test b proper subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "b"}},
+ )).To(BeFalse())
+
+ //test a == b
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
+ )).To(BeFalse())
+
+ //test b superset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
+ )).To(BeTrue())
+
+ //test b not subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "c"}},
+ )).To(BeTrue())
+
+ //test a empty
+ Expect(changesHaveNewTables(map[string]bool{},
+ []common.Change{common.Change{Table: "a"}},
+ )).To(BeTrue())
+
+ //test b empty
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{},
+ )).To(BeFalse())
+
+ //test b nil
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue())
+
+ //test a nil
+ Expect(changesHaveNewTables(nil,
+ []common.Change{common.Change{Table: "a"}},
+ )).To(BeTrue())
})
- //apid.InitializePlugins()
- pie := apid.PluginsInitializedEvent{
- Description: "plugins initialized",
- }
- pie.Plugins = append(pie.Plugins, pluginData)
- postInitPlugins(pie)
- }, 3)
- //this test has a dependency on the one above it. Ideally we would write a test db to the disk instead
- It("should bootstrap from local DB if present", func(done Done) {
+ // todo: disabled for now -
+ // there is precondition I haven't been able to track down that breaks this test on occasion
+ XIt("should process a new snapshot when change server requires it", func(done Done) {
+ oldSnap := apidInfo.LastSnapshot
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ defer GinkgoRecover()
- /* postPluginInit event would have been emitted for the above test, clearing the list of registered plugins
- * In general, any additional sync tests (or any tests causing postInitPlugins to fire)
- * will need to re-register the plugin
- */
- //apid.RegisterPlugin(initPlugin)
-
- expectedTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms.company"},
- common.Change{Table: "edgex.apid_cluster"},
- common.Change{Table: "edgex.data_scope"}},
- }
-
- Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
- if s, ok := event.(*common.Snapshot); ok {
- go func(){quitPollingChangeServer <- true}()
- //verify that the knownTables array has been properly populated from existing DB
- Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
-
- Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
- Expect(s.Tables).To(BeNil())
-
- tokenManager.close()
- close(done)
- }
+ if s, ok := event.(*common.Snapshot); ok {
+ Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
+ close(done)
+ }
+ })
+ testMock.forceNewSnapshot()
})
- //apid.InitializePlugins()
- pie := apid.PluginsInitializedEvent{
- Description: "plugins initialized",
- }
- pie.Plugins = append(pie.Plugins, pluginData)
- postInitPlugins(pie)
-
- }, 3)
-
- It("should correctly identify non-proper subsets with respect to maps", func() {
-
- //test b proper subset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "b"}},
- )).To(BeFalse())
-
- //test a == b
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
- )).To(BeFalse())
-
- //test b superset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
- )).To(BeTrue())
-
- //test b not subset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "c"}},
- )).To(BeTrue())
-
- //test a empty
- Expect(changesHaveNewTables(map[string]bool{},
- []common.Change{common.Change{Table: "a"}},
- )).To(BeTrue())
-
- //test b empty
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{},
- )).To(BeFalse())
-
- //test b nil
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue())
-
- //test a nil
- Expect(changesHaveNewTables(nil,
- []common.Change{common.Change{Table: "a"}},
- )).To(BeTrue())
- })
-
- // todo: disabled for now -
- // there is precondition I haven't been able to track down that breaks this test on occasion
- XIt("should process a new snapshot when change server requires it", func(done Done) {
- oldSnap := apidInfo.LastSnapshot
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- defer GinkgoRecover()
-
- if s, ok := event.(*common.Snapshot); ok {
- Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
- close(done)
- }
- })
- testMock.forceNewSnapshot()
})
})
diff --git a/token.go b/token.go
index c9cf438..3c4bcbc 100644
--- a/token.go
+++ b/token.go
@@ -14,7 +14,6 @@
var (
refreshFloatTime = time.Minute
getTokenLock sync.Mutex
- quitPollingForToken chan bool = make(chan bool)
)
/*
@@ -29,6 +28,10 @@
func createTokenManager() *tokenMan {
t := &tokenMan{}
t.doRefresh = make(chan bool, 1)
+ t.quitPollingForToken = make(chan bool, 1)
+ t.tokenRefreshed = make(chan bool)
+ t.closed = make(chan bool)
+ t.retrieveNewToken()
t.maintainToken()
return t
}
@@ -36,7 +39,9 @@
type tokenMan struct {
token *oauthToken
doRefresh chan bool
- continueMaintenance bool
+ quitPollingForToken chan bool
+ tokenRefreshed chan bool
+ closed chan bool
}
func (t *tokenMan) getBearerToken() string {
@@ -45,26 +50,26 @@
func (t *tokenMan) maintainToken() {
go func() {
- t.getToken()
for {
select {
- case <- quitPollingForToken:
- log.Info("Signal to quit maintenance of token recieved")
- return
+
case _, ok := <-t.doRefresh:
if !ok {
log.Debug("closed tokenMan")
+ t.closed <- true
return
}
log.Debug("force token refresh")
+ t.retrieveNewToken()
+ t.tokenRefreshed <- true
+ continue
case <-time.After(t.token.refreshIn()):
log.Debug("auto refresh token")
- }
-
- if t.token.needsRefresh() {
getTokenLock.Lock()
t.retrieveNewToken()
getTokenLock.Unlock()
+ continue
+
}
}
}()
@@ -72,28 +77,28 @@
func (t *tokenMan) invalidateToken() {
log.Debug("invalidating token")
+ getTokenLock.Lock()
t.token = nil
t.doRefresh <- true
+ //ensure refresh signal has been received
+ <-t.tokenRefreshed
+ getTokenLock.Unlock()
}
// will block until valid
+//assumption is that if we can get the lock, then it's valid
func (t *tokenMan) getToken() *oauthToken {
getTokenLock.Lock()
defer getTokenLock.Unlock()
-
- if t.token.isValid() {
- log.Debugf("returning existing token: %v", t.token)
- return t.token
- }
-
- t.retrieveNewToken()
return t.token
}
func (t *tokenMan) close() {
log.Debug("close token manager")
- quitPollingForToken <- true
+ t.quitPollingForToken <- true
close(t.doRefresh)
+ //block until close signal has been received by maintenance routine
+ <-t.closed
}
// don't call externally. will block until success.
@@ -107,7 +112,7 @@
}
uri.Path = path.Join(uri.Path, "/accesstoken")
- pollWithBackoff(quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) {log.Errorf("Error getting new token : ", err)})
+ pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) {log.Errorf("Error getting new token : ", err)})
}
func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
diff --git a/token_test.go b/token_test.go
index 4fbd0b0..e1bb460 100644
--- a/token_test.go
+++ b/token_test.go
@@ -65,7 +65,18 @@
Context("tokenMan", func() {
It("should get a valid token", func() {
-log.Info("\n\n\n\nHERE\n\n")
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+
+ res := oauthToken{
+ AccessToken: "ABCD",
+ ExpiresIn: 1000,
+ }
+ body, err := json.Marshal(res)
+ Expect(err).NotTo(HaveOccurred())
+ w.Write(body)
+ }))
+ config.Set(configProxyServerBaseURI, ts.URL)
tokenManager = createTokenManager()
token := tokenManager.getToken()
@@ -76,10 +87,24 @@
bToken := tokenManager.getBearerToken()
Expect(bToken).To(Equal(token.AccessToken))
tokenManager.close()
- }, 2)
+ ts.Close()
+ })
It("should refresh when forced to", func() {
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+
+ res := oauthToken{
+ AccessToken: generateUUID(),
+ ExpiresIn: 1000,
+ }
+ body, err := json.Marshal(res)
+ Expect(err).NotTo(HaveOccurred())
+ w.Write(body)
+ }))
+ config.Set(configProxyServerBaseURI, ts.URL)
+
tokenManager = createTokenManager()
token := tokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -90,7 +115,8 @@
Expect(token).ToNot(Equal(token2))
Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
tokenManager.close()
- }, 2)
+ ts.Close()
+ })
It("should refresh in refresh interval", func(done Done) {
@@ -128,7 +154,7 @@
ts.Close()
close(done)
- }, 2)
+ })
It("should have created_at_apid first time, update_at_apid after", func(done Done) {
finished := make(chan bool, 1)
@@ -168,6 +194,6 @@
tokenManager.close()
ts.Close()
close(done)
- }, 2)
+ })
})
})