Merge conflict resolution
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index a55410e..9703cdf 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -23,6 +23,8 @@ wipeDBAferTest bool ) +const dummyConfigValue string = "placeholder" + var _ = BeforeSuite(func(){ wipeDBAferTest = true }) @@ -37,12 +39,9 @@ Expect(err).NotTo(HaveOccurred()) config.Set("local_storage_path", tmpDir) - testRouter = apid.API().Router() - testServer = httptest.NewServer(testRouter) - - config.Set(configProxyServerBaseURI, testServer.URL) - config.Set(configSnapServerBaseURI, testServer.URL) - config.Set(configChangeServerBaseURI, testServer.URL) + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) config.Set(configSnapshotProtocol, "json") config.Set(configPollInterval, 10*time.Millisecond) @@ -54,20 +53,7 @@ block = "0" log = apid.Log() - // set up mock server - mockParms := MockParms{ - ReliableAPI: false, - ClusterID: config.GetString(configApidClusterId), - TokenKey: config.GetString(configConsumerKey), - TokenSecret: config.GetString(configConsumerSecret), - Scope: "ert452", - Organization: "att", - Environment: "prod", - } - testMock = Mock(mockParms, testRouter) - _initPlugin(apid.AllServices()) - tokenManager = nil close(done) })
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index cebe7cc..7f25087 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -6,214 +6,243 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "time" + "net/http/httptest" ) -var _ = Describe("listener", func() { +var _ = Describe("Sync", func() { - It("should succesfully bootstrap from clean slate", func(done Done) { - log.Info("Starting sync tests...") - - // do not wipe DB after. Lets use it - wipeDBAferTest = false - var lastSnapshot *common.Snapshot + Context("Sync", func() { - expectedSnapshotTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms.company"}, - common.Change{Table: "edgex.apid_cluster"}, - common.Change{Table: "edgex.data_scope"}}, + var initializeContext = func() { + testRouter = apid.API().Router() + testServer = httptest.NewServer(testRouter) + + // set up mock server + mockParms := MockParms{ + ReliableAPI: true, + ClusterID: config.GetString(configApidClusterId), + TokenKey: config.GetString(configConsumerKey), + TokenSecret: config.GetString(configConsumerSecret), + Scope: "ert452", + Organization: "att", + Environment: "prod", + } + testMock = Mock(mockParms, testRouter) + + config.Set(configProxyServerBaseURI, testServer.URL) + config.Set(configSnapServerBaseURI, testServer.URL) + config.Set(configChangeServerBaseURI, testServer.URL) } - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + var restoreContext = func() { - if s, ok := event.(*common.Snapshot); ok { + tokenManager.close() + testServer.Close() - Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) - //add apid_cluster and data_scope since those would present if this were a real scenario - knownTables["kms.app_credential"] = true - knownTables["kms.app_credential_apiproduct_mapper"] = true - knownTables["kms.developer"] = true - knownTables["kms.company_developer"] = true - knownTables["kms.api_product"] = true - knownTables["kms.app"] = true + } - lastSnapshot = s + It("should succesfully bootstrap from clean slate", func(done Done) { + log.Info("Starting sync tests...") - for _, t := range s.Tables { - switch t.Name { + initializeContext() + // do not wipe DB after. Lets use it + wipeDBAferTest = false + var lastSnapshot *common.Snapshot - case "edgex.apid_cluster": - Expect(t.Rows).To(HaveLen(1)) - r := t.Rows[0] - var id string - r.Get("id", &id) - Expect(id).To(Equal("bootstrap")) + expectedSnapshotTables := common.ChangeList{ + Changes: []common.Change{common.Change{Table: "kms.company"}, + common.Change{Table: "edgex.apid_cluster"}, + common.Change{Table: "edgex.data_scope"}}, + } - case "edgex.data_scope": - Expect(t.Rows).To(HaveLen(2)) - r := t.Rows[1] // get the non-cluster row + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - var id, clusterID, env, org, scope string - r.Get("id", &id) - r.Get("apid_cluster_id", &clusterID) - r.Get("env", &env) - r.Get("org", &org) - r.Get("scope", &scope) + if s, ok := event.(*common.Snapshot); ok { - Expect(id).To(Equal("ert452")) - Expect(scope).To(Equal("ert452")) - Expect(clusterID).To(Equal("bootstrap")) - Expect(env).To(Equal("prod")) - Expect(org).To(Equal("att")) + Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) + + //add apid_cluster and data_scope since those would present if this were a real scenario + knownTables["kms.app_credential"] = true + knownTables["kms.app_credential_apiproduct_mapper"] = true + knownTables["kms.developer"] = true + knownTables["kms.company_developer"] = true + knownTables["kms.api_product"] = true + knownTables["kms.app"] = true + + lastSnapshot = s + + for _, t := range s.Tables { + switch t.Name { + + case "edgex.apid_cluster": + Expect(t.Rows).To(HaveLen(1)) + r := t.Rows[0] + var id string + r.Get("id", &id) + Expect(id).To(Equal("bootstrap")) + + case "edgex.data_scope": + Expect(t.Rows).To(HaveLen(2)) + r := t.Rows[1] // get the non-cluster row + + var id, clusterID, env, org, scope string + r.Get("id", &id) + r.Get("apid_cluster_id", &clusterID) + r.Get("env", &env) + r.Get("org", &org) + r.Get("scope", &scope) + + Expect(id).To(Equal("ert452")) + Expect(scope).To(Equal("ert452")) + Expect(clusterID).To(Equal("bootstrap")) + Expect(env).To(Equal("prod")) + Expect(org).To(Equal("att")) + } } - } - } else if cl, ok := event.(*common.ChangeList); ok { - go func(){quitPollingChangeServer <- true}() - // ensure that snapshot switched DB versions - Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) - expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - Expect(getDB() == expectedDB).Should(BeTrue()) - - Expect(cl.Changes).To(HaveLen(6)) - - var tables []string - for _, c := range cl.Changes { - tables = append(tables, c.Table) - Expect(c.NewRow).ToNot(BeNil()) - - var tenantID string - c.NewRow.Get("tenant_id", &tenantID) - Expect(tenantID).To(Equal("ert452")) - } - - Expect(tables).To(ContainElement("kms.app_credential")) - Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) - Expect(tables).To(ContainElement("kms.developer")) - Expect(tables).To(ContainElement("kms.company_developer")) - Expect(tables).To(ContainElement("kms.api_product")) - Expect(tables).To(ContainElement("kms.app")) - - events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { - defer GinkgoRecover() - - // allow other handler to execute to insert last_sequence - time.Sleep(50 * time.Millisecond) - var seq string - err = getDB(). - QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). - Scan(&seq) - + } else if cl, ok := event.(*common.ChangeList); ok { + go func(){quitPollingChangeServer <- true}() + // ensure that snapshot switched DB versions + Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) + expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) Expect(err).NotTo(HaveOccurred()) - Expect(seq).To(Equal(cl.LastSequence)) + Expect(getDB() == expectedDB).Should(BeTrue()) - tokenManager.close() + Expect(cl.Changes).To(HaveLen(6)) + + var tables []string + for _, c := range cl.Changes { + tables = append(tables, c.Table) + Expect(c.NewRow).ToNot(BeNil()) + + var tenantID string + c.NewRow.Get("tenant_id", &tenantID) + Expect(tenantID).To(Equal("ert452")) + } + + Expect(tables).To(ContainElement("kms.app_credential")) + Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) + Expect(tables).To(ContainElement("kms.developer")) + Expect(tables).To(ContainElement("kms.company_developer")) + Expect(tables).To(ContainElement("kms.api_product")) + Expect(tables).To(ContainElement("kms.app")) + + events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { + defer GinkgoRecover() + + // allow other handler to execute to insert last_sequence + time.Sleep(50 * time.Millisecond) + var seq string + err = getDB(). + QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). + Scan(&seq) + + Expect(err).NotTo(HaveOccurred()) + Expect(seq).To(Equal(cl.LastSequence)) + + restoreContext() + close(done) + }) + } + }) + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", + } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + }, 3) + + It("should bootstrap from local DB if present", func(done Done) { + + initializeContext() + expectedTables := common.ChangeList{ + Changes: []common.Change{common.Change{Table: "kms.company"}, + common.Change{Table: "edgex.apid_cluster"}, + common.Change{Table: "edgex.data_scope"}}, + } + + Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) + + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + + if s, ok := event.(*common.Snapshot); ok { + go func(){quitPollingChangeServer <- true}() + //verify that the knownTables array has been properly populated from existing DB + Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) + + Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) + Expect(s.Tables).To(BeNil()) + + restoreContext() close(done) - }) + } + }) + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + + }, 3) + + It("should correctly identify non-proper subsets with respect to maps", func() { + + //test b proper subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "b"}}, + )).To(BeFalse()) + + //test a == b + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, + )).To(BeFalse()) + + //test b superset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, + )).To(BeTrue()) + + //test b not subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "c"}}, + )).To(BeTrue()) + + //test a empty + Expect(changesHaveNewTables(map[string]bool{}, + []common.Change{common.Change{Table: "a"}}, + )).To(BeTrue()) + + //test b empty + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{}, + )).To(BeFalse()) + + //test b nil + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue()) + + //test a nil + Expect(changesHaveNewTables(nil, + []common.Change{common.Change{Table: "a"}}, + )).To(BeTrue()) }) - //apid.InitializePlugins() - pie := apid.PluginsInitializedEvent{ - Description: "plugins initialized", - } - pie.Plugins = append(pie.Plugins, pluginData) - postInitPlugins(pie) - }, 3) - //this test has a dependency on the one above it. Ideally we would write a test db to the disk instead - It("should bootstrap from local DB if present", func(done Done) { + // todo: disabled for now - + // there is precondition I haven't been able to track down that breaks this test on occasion + XIt("should process a new snapshot when change server requires it", func(done Done) { + oldSnap := apidInfo.LastSnapshot + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + defer GinkgoRecover() - /* postPluginInit event would have been emitted for the above test, clearing the list of registered plugins - * In general, any additional sync tests (or any tests causing postInitPlugins to fire) - * will need to re-register the plugin - */ - //apid.RegisterPlugin(initPlugin) - - expectedTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms.company"}, - common.Change{Table: "edgex.apid_cluster"}, - common.Change{Table: "edgex.data_scope"}}, - } - - Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - - if s, ok := event.(*common.Snapshot); ok { - go func(){quitPollingChangeServer <- true}() - //verify that the knownTables array has been properly populated from existing DB - Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) - - Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) - Expect(s.Tables).To(BeNil()) - - tokenManager.close() - close(done) - } + if s, ok := event.(*common.Snapshot); ok { + Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) + close(done) + } + }) + testMock.forceNewSnapshot() }) - //apid.InitializePlugins() - pie := apid.PluginsInitializedEvent{ - Description: "plugins initialized", - } - pie.Plugins = append(pie.Plugins, pluginData) - postInitPlugins(pie) - - }, 3) - - It("should correctly identify non-proper subsets with respect to maps", func() { - - //test b proper subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "b"}}, - )).To(BeFalse()) - - //test a == b - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, - )).To(BeFalse()) - - //test b superset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, - )).To(BeTrue()) - - //test b not subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "c"}}, - )).To(BeTrue()) - - //test a empty - Expect(changesHaveNewTables(map[string]bool{}, - []common.Change{common.Change{Table: "a"}}, - )).To(BeTrue()) - - //test b empty - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{}, - )).To(BeFalse()) - - //test b nil - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue()) - - //test a nil - Expect(changesHaveNewTables(nil, - []common.Change{common.Change{Table: "a"}}, - )).To(BeTrue()) - }) - - // todo: disabled for now - - // there is precondition I haven't been able to track down that breaks this test on occasion - XIt("should process a new snapshot when change server requires it", func(done Done) { - oldSnap := apidInfo.LastSnapshot - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) - close(done) - } - }) - testMock.forceNewSnapshot() }) })
diff --git a/token.go b/token.go index c9cf438..3c4bcbc 100644 --- a/token.go +++ b/token.go
@@ -14,7 +14,6 @@ var ( refreshFloatTime = time.Minute getTokenLock sync.Mutex - quitPollingForToken chan bool = make(chan bool) ) /* @@ -29,6 +28,10 @@ func createTokenManager() *tokenMan { t := &tokenMan{} t.doRefresh = make(chan bool, 1) + t.quitPollingForToken = make(chan bool, 1) + t.tokenRefreshed = make(chan bool) + t.closed = make(chan bool) + t.retrieveNewToken() t.maintainToken() return t } @@ -36,7 +39,9 @@ type tokenMan struct { token *oauthToken doRefresh chan bool - continueMaintenance bool + quitPollingForToken chan bool + tokenRefreshed chan bool + closed chan bool } func (t *tokenMan) getBearerToken() string { @@ -45,26 +50,26 @@ func (t *tokenMan) maintainToken() { go func() { - t.getToken() for { select { - case <- quitPollingForToken: - log.Info("Signal to quit maintenance of token recieved") - return + case _, ok := <-t.doRefresh: if !ok { log.Debug("closed tokenMan") + t.closed <- true return } log.Debug("force token refresh") + t.retrieveNewToken() + t.tokenRefreshed <- true + continue case <-time.After(t.token.refreshIn()): log.Debug("auto refresh token") - } - - if t.token.needsRefresh() { getTokenLock.Lock() t.retrieveNewToken() getTokenLock.Unlock() + continue + } } }() @@ -72,28 +77,28 @@ func (t *tokenMan) invalidateToken() { log.Debug("invalidating token") + getTokenLock.Lock() t.token = nil t.doRefresh <- true + //ensure refresh signal has been received + <-t.tokenRefreshed + getTokenLock.Unlock() } // will block until valid +//assumption is that if we can get the lock, then it's valid func (t *tokenMan) getToken() *oauthToken { getTokenLock.Lock() defer getTokenLock.Unlock() - - if t.token.isValid() { - log.Debugf("returning existing token: %v", t.token) - return t.token - } - - t.retrieveNewToken() return t.token } func (t *tokenMan) close() { log.Debug("close token manager") - quitPollingForToken <- true + t.quitPollingForToken <- true close(t.doRefresh) + //block until close signal has been received by maintenance routine + <-t.closed } // don't call externally. will block until success. @@ -107,7 +112,7 @@ } uri.Path = path.Join(uri.Path, "/accesstoken") - pollWithBackoff(quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) {log.Errorf("Error getting new token : ", err)}) + pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) {log.Errorf("Error getting new token : ", err)}) } func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
diff --git a/token_test.go b/token_test.go index 4fbd0b0..e1bb460 100644 --- a/token_test.go +++ b/token_test.go
@@ -65,7 +65,18 @@ Context("tokenMan", func() { It("should get a valid token", func() { -log.Info("\n\n\n\nHERE\n\n") + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + + res := oauthToken{ + AccessToken: "ABCD", + ExpiresIn: 1000, + } + body, err := json.Marshal(res) + Expect(err).NotTo(HaveOccurred()) + w.Write(body) + })) + config.Set(configProxyServerBaseURI, ts.URL) tokenManager = createTokenManager() token := tokenManager.getToken() @@ -76,10 +87,24 @@ bToken := tokenManager.getBearerToken() Expect(bToken).To(Equal(token.AccessToken)) tokenManager.close() - }, 2) + ts.Close() + }) It("should refresh when forced to", func() { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + + res := oauthToken{ + AccessToken: generateUUID(), + ExpiresIn: 1000, + } + body, err := json.Marshal(res) + Expect(err).NotTo(HaveOccurred()) + w.Write(body) + })) + config.Set(configProxyServerBaseURI, ts.URL) + tokenManager = createTokenManager() token := tokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -90,7 +115,8 @@ Expect(token).ToNot(Equal(token2)) Expect(token.AccessToken).ToNot(Equal(token2.AccessToken)) tokenManager.close() - }, 2) + ts.Close() + }) It("should refresh in refresh interval", func(done Done) { @@ -128,7 +154,7 @@ ts.Close() close(done) - }, 2) + }) It("should have created_at_apid first time, update_at_apid after", func(done Done) { finished := make(chan bool, 1) @@ -168,6 +194,6 @@ tokenManager.close() ts.Close() close(done) - }, 2) + }) }) })