tweak: additional refactoring and testing improvement
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index f64f9b6..fe4bc9b 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -1,7 +1,6 @@ package apidApigeeSync import ( - "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -12,17 +11,23 @@ "time" "github.com/30x/apid-core" + "github.com/30x/apid-core/factory" ) var ( - tmpDir string - testServer *httptest.Server - testRouter apid.Router - testMock *MockServer + tmpDir string + testServer *httptest.Server + testRouter apid.Router + testMock *MockServer + wipeDBAferTest bool ) -var _ = BeforeSuite(func(done Done) { +var _ = BeforeSuite(func(){ + wipeDBAferTest = true +}) + +var _ = BeforeEach(func(done Done) { apid.Initialize(factory.DefaultServicesFactory()) config = apid.Config() @@ -61,132 +66,26 @@ } testMock = Mock(mockParms, testRouter) - // This is actually the first test :) - // Tests that entire bootstrap and set of sync operations work - var lastSnapshot *common.Snapshot - - expectedSnapshotTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms.company"}, - common.Change{Table: "edgex.apid_cluster"}, - common.Change{Table: "edgex.data_scope"}}, - } - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - - //verify that during downloadDataSnapshot, knownTables was correctly populated - Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) - - /* After this, we will mock changes for tables not present in the initial snapshot - * until that is changed in the mock server, we have to spoof the known tables, - * since they ar expected during the processing of the change event below - * - */ - - //add apid_cluster and data_scope since those would present if this were a real scenario - knownTables["kms.app_credential"] = true - knownTables["kms.app_credential_apiproduct_mapper"] = true - knownTables["kms.developer"] = true - knownTables["kms.company_developer"] = true - knownTables["kms.api_product"] = true - knownTables["kms.app"] = true - - - lastSnapshot = s - - for _, t := range s.Tables { - switch t.Name { - - case "edgex.apid_cluster": - Expect(t.Rows).To(HaveLen(1)) - r := t.Rows[0] - var id string - r.Get("id", &id) - Expect(id).To(Equal("bootstrap")) - - case "edgex.data_scope": - Expect(t.Rows).To(HaveLen(2)) - r := t.Rows[1] // get the non-cluster row - - var id, clusterID, env, org, scope string - r.Get("id", &id) - r.Get("apid_cluster_id", &clusterID) - r.Get("env", &env) - r.Get("org", &org) - r.Get("scope", &scope) - - Expect(id).To(Equal("ert452")) - Expect(scope).To(Equal("ert452")) - Expect(clusterID).To(Equal("bootstrap")) - Expect(env).To(Equal("prod")) - Expect(org).To(Equal("att")) - } - } - - } else if cl, ok := event.(*common.ChangeList); ok { - - // ensure that snapshot switched DB versions - Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) - expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - Expect(getDB() == expectedDB).Should(BeTrue()) - - Expect(cl.Changes).To(HaveLen(6)) - - var tables []string - for _, c := range cl.Changes { - tables = append(tables, c.Table) - Expect(c.NewRow).ToNot(BeNil()) - - var tenantID string - c.NewRow.Get("tenant_id", &tenantID) - Expect(tenantID).To(Equal("ert452")) - } - - Expect(tables).To(ContainElement("kms.app_credential")) - Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) - Expect(tables).To(ContainElement("kms.developer")) - Expect(tables).To(ContainElement("kms.company_developer")) - Expect(tables).To(ContainElement("kms.api_product")) - Expect(tables).To(ContainElement("kms.app")) - - events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { - defer GinkgoRecover() - - // allow other handler to execute to insert last_sequence - time.Sleep(50 * time.Millisecond) - var seq string - err = getDB(). - QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). - Scan(&seq) - - Expect(err).NotTo(HaveOccurred()) - Expect(seq).To(Equal(cl.LastSequence)) - - close(done) - }) - } - }) - - apid.InitializePlugins() + _initPlugin(apid.AllServices()) + close(done) }) -var _ = BeforeEach(func() { +var _ = AfterEach(func() { apid.Events().Close() - lastSequence = "" - _, err := getDB().Exec("DELETE FROM APID_CLUSTER") - Expect(err).NotTo(HaveOccurred()) - _, err = getDB().Exec("DELETE FROM DATA_SCOPE") - Expect(err).NotTo(HaveOccurred()) + if (wipeDBAferTest) { + _, err := getDB().Exec("DELETE FROM APID_CLUSTER") + Expect(err).NotTo(HaveOccurred()) + _, err = getDB().Exec("DELETE FROM DATA_SCOPE") + Expect(err).NotTo(HaveOccurred()) - db, err := dataService.DB() - Expect(err).NotTo(HaveOccurred()) - _, err = db.Exec("DELETE FROM APID") - Expect(err).NotTo(HaveOccurred()) + db, err := dataService.DB() + Expect(err).NotTo(HaveOccurred()) + _, err = db.Exec("DELETE FROM APID") + Expect(err).NotTo(HaveOccurred()) + } + wipeDBAferTest = true }) var _ = AfterSuite(func() {
diff --git a/apigee_sync.go b/apigee_sync.go index f869ddc..703681a 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -55,6 +55,7 @@ for { select { case <-quit: + log.Info("Quit signal recieved. Returning") return case <-retry: start := time.Now()
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 3984271..5af8137 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -5,11 +5,127 @@ "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "time" ) var _ = Describe("listener", func() { + It("should succesfully bootstrap from clean slate", func(done Done) { + log.Info("Starting sync tests...") + + // do not wipe DB after. Lets use it + wipeDBAferTest = false + var lastSnapshot *common.Snapshot + + expectedSnapshotTables := common.ChangeList{ + Changes: []common.Change{common.Change{Table: "kms.company"}, + common.Change{Table: "edgex.apid_cluster"}, + common.Change{Table: "edgex.data_scope"}}, + } + + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + + if s, ok := event.(*common.Snapshot); ok { + + Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) + + //add apid_cluster and data_scope since those would present if this were a real scenario + knownTables["kms.app_credential"] = true + knownTables["kms.app_credential_apiproduct_mapper"] = true + knownTables["kms.developer"] = true + knownTables["kms.company_developer"] = true + knownTables["kms.api_product"] = true + knownTables["kms.app"] = true + + lastSnapshot = s + + for _, t := range s.Tables { + switch t.Name { + + case "edgex.apid_cluster": + Expect(t.Rows).To(HaveLen(1)) + r := t.Rows[0] + var id string + r.Get("id", &id) + Expect(id).To(Equal("bootstrap")) + + case "edgex.data_scope": + Expect(t.Rows).To(HaveLen(2)) + r := t.Rows[1] // get the non-cluster row + + var id, clusterID, env, org, scope string + r.Get("id", &id) + r.Get("apid_cluster_id", &clusterID) + r.Get("env", &env) + r.Get("org", &org) + r.Get("scope", &scope) + + Expect(id).To(Equal("ert452")) + Expect(scope).To(Equal("ert452")) + Expect(clusterID).To(Equal("bootstrap")) + Expect(env).To(Equal("prod")) + Expect(org).To(Equal("att")) + } + } + + } else if cl, ok := event.(*common.ChangeList); ok { + go func(){quitPollingChangeServer <- true}() + // ensure that snapshot switched DB versions + Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) + expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) + Expect(err).NotTo(HaveOccurred()) + Expect(getDB() == expectedDB).Should(BeTrue()) + + Expect(cl.Changes).To(HaveLen(6)) + + var tables []string + for _, c := range cl.Changes { + tables = append(tables, c.Table) + Expect(c.NewRow).ToNot(BeNil()) + + var tenantID string + c.NewRow.Get("tenant_id", &tenantID) + Expect(tenantID).To(Equal("ert452")) + } + + Expect(tables).To(ContainElement("kms.app_credential")) + Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) + Expect(tables).To(ContainElement("kms.developer")) + Expect(tables).To(ContainElement("kms.company_developer")) + Expect(tables).To(ContainElement("kms.api_product")) + Expect(tables).To(ContainElement("kms.app")) + + events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { + defer GinkgoRecover() + + // allow other handler to execute to insert last_sequence + time.Sleep(50 * time.Millisecond) + var seq string + err = getDB(). + QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). + Scan(&seq) + + Expect(err).NotTo(HaveOccurred()) + Expect(seq).To(Equal(cl.LastSequence)) + + //sleep so that logging output from CS poller stops before test closes + time.Sleep(100 * time.Millisecond) + close(done) + }) + } + }) + apid.InitializePlugins() + }, 3) + + //this test has a dependency on the one above it. Ideally we would write a test db to the disk instead It("should bootstrap from local DB if present", func(done Done) { + + /* postPluginInit event would have been emitted for the above test, clearing the list of registered plugins + * In general, any additional sync tests (or any tests causing postInitPlugins to fire) + * will need to re-register the plugin + */ + apid.RegisterPlugin(initPlugin) + expectedTables := common.ChangeList{ Changes: []common.Change{common.Change{Table: "kms.company"}, common.Change{Table: "edgex.apid_cluster"}, @@ -19,22 +135,23 @@ Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() if s, ok := event.(*common.Snapshot); ok { - + go func(){quitPollingChangeServer <- true}() //verify that the knownTables array has been properly populated from existing DB Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) Expect(s.Tables).To(BeNil()) + //sleep so that logging output from CS poller stops before test closes + time.Sleep(100 * time.Millisecond) close(done) } }) + apid.InitializePlugins() - bootstrap(make(chan bool), make(chan bool)) - }) + }, 3) It("should correctly identify non-proper subsets with respect to maps", func() {
diff --git a/changes.go b/changes.go index 997c592..6a0afad 100644 --- a/changes.go +++ b/changes.go
@@ -37,6 +37,7 @@ for { select { case <-quit: + log.Info("Recevied quit signal to stop polling change server") return quitSignalError{} default: err := getChanges(changesUri) @@ -81,17 +82,18 @@ addHeaders(req) r, err := client.Do(req) - defer r.Body.Close() if err != nil { log.Errorf("change agent comm error: %s", err) return err } + defer r.Body.Close() if r.StatusCode != http.StatusOK { log.Errorf("Get changes request failed with status code: %d", r.StatusCode) switch r.StatusCode { case http.StatusUnauthorized: tokenManager.invalidateToken() + return nil case http.StatusNotModified: return nil @@ -113,9 +115,9 @@ log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") err = apiErr } + return nil } - - return err + return nil } var resp common.ChangeList @@ -174,11 +176,12 @@ //nil maps should not be passed in. Making the distinction between nil map and empty map if a == nil || changes == nil{ - return true; + return true } for _, change := range changes { if !a[change.Table] { + log.Infof("Unable to find %s table in current known tables", change.Table) return true } }
diff --git a/data.go b/data.go index 53e1e56..4e5a62c 100644 --- a/data.go +++ b/data.go
@@ -254,6 +254,7 @@ newInstanceID = true info.InstanceID = generateUUID() + log.Debugf("Inserting new apid instance id %s", info.InstanceID) db.Exec("INSERT INTO APID (instance_id, last_snapshot_info) VALUES (?,?)", info.InstanceID, "") }
diff --git a/init.go b/init.go index 1f9d1b4..7967e53 100644 --- a/init.go +++ b/init.go
@@ -29,14 +29,22 @@ ) var ( + /* All set during plugin initialization */ log apid.LogService config apid.ConfigService dataService apid.DataService events apid.EventsService apidInfo apidInstanceInfo - apidPluginDetails string newInstanceID bool tokenManager *tokenMan + quitPollingSnapshotServer chan bool + quitPollingChangeServer chan bool + + /* Set during post plugin initialization + * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called + */ + apidPluginDetails string = `[{"name":"apidApigeeSync","schemaVer":"1.0"}]` + ) type apidInstanceInfo struct { @@ -52,7 +60,7 @@ apid.RegisterPlugin(initPlugin) } -func initDefaults() { +func initConfigDefaults() { config.SetDefault(configPollInterval, 120*time.Second) config.SetDefault(configSnapshotProtocol, "json") name, errh := os.Hostname() @@ -64,53 +72,28 @@ log.Debugf("Using %s as display name", config.GetString(configName)) } -func SetLogger(logger apid.LogService) { - log = logger -} - -func initPlugin(services apid.Services) (apid.PluginData, error) { - SetLogger(services.Log().ForModule("apigeeSync")) - log.Debug("start init") - - config = services.Config() - initDefaults() - +func initVariables(services apid.Services) (error) { dataService = services.Data() events = services.Events() - - /* This callback function will get called, once all the plugins are - * initialized (not just this plugin). This is needed because, - * downloadSnapshots/changes etc have to begin to be processed only - * after all the plugins are initialized - */ - events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) - - // check for required values - for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, - configSnapServerBaseURI, configChangeServerBaseURI} { - if !config.IsSet(key) { - return pluginData, fmt.Errorf("Missing required config value: %s", key) - } - } - proto := config.GetString(configSnapshotProtocol) - if proto != "json" && proto != "proto" { - return pluginData, fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) - } + //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines + //also useful for testing + quitPollingSnapshotServer = make(chan bool) + quitPollingChangeServer = make(chan bool) // set up default database db, err := dataService.DB() if err != nil { - return pluginData, fmt.Errorf("Unable to access DB: %v", err) + return fmt.Errorf("Unable to access DB: %v", err) } err = initDB(db) if err != nil { - return pluginData, fmt.Errorf("Unable to access DB: %v", err) + return fmt.Errorf("Unable to access DB: %v", err) } setDB(db) apidInfo, err = getApidInstanceInfo() if err != nil { - return pluginData, fmt.Errorf("Unable to get apid instance info: %v", err) + return fmt.Errorf("Unable to get apid instance info: %v", err) } if config.IsSet(configApidInstanceID) { @@ -118,6 +101,64 @@ } config.Set(configApidInstanceID, apidInfo.InstanceID) + return nil +} + +func checkForRequiredValues() (error) { + // check for required values + for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, + configSnapServerBaseURI, configChangeServerBaseURI} { + if !config.IsSet(key) { + return fmt.Errorf("Missing required config value: %s", key) + } + } + proto := config.GetString(configSnapshotProtocol) + if proto != "json" && proto != "proto" { + return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) + } + + return nil +} + +func SetLogger(logger apid.LogService) { + log = logger +} + +/* Idempotent state initialization */ +func _initPlugin(services apid.Services) (error) { + SetLogger(services.Log().ForModule("apigeeSync")) + log.Debug("start init") + + config = services.Config() + err := checkForRequiredValues() + if err != nil { + return err + } + + initConfigDefaults() + + err = initVariables(services) + if err != nil { + return err + } + + return nil +} + +func initPlugin(services apid.Services) (apid.PluginData, error) { + + err := _initPlugin(services) + if err != nil { + return pluginData, err + } + + /* This callback function will get called once all the plugins are + * initialized (not just this plugin). This is needed because, + * downloadSnapshots/changes etc have to begin to be processed only + * after all the plugins are initialized + */ + events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) + log.Debug("end init") return pluginData, nil @@ -156,10 +197,6 @@ tokenManager = createTokenManager() - //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines - //also useful for testing - quitPollingSnapshotServer := make(chan bool) - quitPollingChangeServer := make(chan bool) go bootstrap(quitPollingSnapshotServer, quitPollingChangeServer) events.Listen(ApigeeSyncEventSelector, &handler{})
diff --git a/init_test.go b/init_test.go index 7bcee56..6135875 100644 --- a/init_test.go +++ b/init_test.go
@@ -10,13 +10,15 @@ Context("Apid Instance display name", func() { It("should be hostname by default", func() { - initDefaults() + log.Info("Starting init tests...") + + initConfigDefaults() Expect(apidInfo.InstanceName).To(Equal("testhost")) }) It("accept display name from config", func() { config.Set(configName, "aa01") - initDefaults() + initConfigDefaults() var apidInfoLatest apidInstanceInfo apidInfoLatest, _ = getApidInstanceInfo() Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
diff --git a/listener_test.go b/listener_test.go index 8b45b3d..e426d39 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -15,6 +15,8 @@ Context("ApigeeSync snapshot event", func() { It("should set DB to appropriate version", func() { + log.Info("Starting listener tests...") + //save the last snapshot, so we can restore it at the end of this context saveLastSnapshot = apidInfo.LastSnapshot
diff --git a/token.go b/token.go index 46d1d5d..f374d77 100644 --- a/token.go +++ b/token.go
@@ -74,6 +74,8 @@ getTokenLock.Lock() defer getTokenLock.Unlock() + + if t.token.isValid() { log.Debugf("returning existing token: %v", t.token) return t.token @@ -138,7 +140,7 @@ if resp.StatusCode != 200 { log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body)) - return err + return expected200Error{} } var token oauthToken @@ -169,6 +171,7 @@ t.token = &token config.Set(configBearerToken, token.AccessToken) + return nil } }
diff --git a/token_test.go b/token_test.go index 5deec1c..16ff83a 100644 --- a/token_test.go +++ b/token_test.go
@@ -17,6 +17,8 @@ Context("oauthToken", func() { It("should calculate valid token", func() { + log.Info("Starting token tests...") + t := &oauthToken{ AccessToken: "x", ExpiresIn: 120000, @@ -28,6 +30,7 @@ }) It("should calculate expired token", func() { + t := &oauthToken{ AccessToken: "x", ExpiresIn: 0, @@ -39,6 +42,7 @@ }) It("should calculate token needing refresh", func() { + t := &oauthToken{ AccessToken: "x", ExpiresIn: 59000, @@ -60,6 +64,8 @@ Context("tokenMan", func() { It("should get a valid token", func() { + tokenManager := createTokenManager() + token := tokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -71,6 +77,8 @@ }) It("should refresh when forced to", func() { + tokenManager := createTokenManager() + token := tokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -82,6 +90,7 @@ }) It("should refresh in refresh interval", func(done Done) { + tokenManager := createTokenManager() finished := make(chan bool) var tm *tokenMan @@ -127,6 +136,7 @@ }) It("should have created_at_apid first time, update_at_apid after", func(done Done) { + tokenManager := createTokenManager() finished := make(chan bool) var tm *tokenMan