Issue 69568832 refactor code, fix bugs, add tests (#76) * [ISSUE-69568832] refactor * [ISSUE-69568832] refactor code, fix bugs, add tests * [ISSUE-69568832] fix docker tests * [ISSUE-69568832] format code * [ISSUE-69568832] address comments
diff --git a/api.go b/api.go index 73399d7..1fceebb 100644 --- a/api.go +++ b/api.go
@@ -24,32 +24,45 @@ const tokenEndpoint = "/accesstoken" -func InitAPI(services apid.Services) { - services.API().HandleFunc(tokenEndpoint, getAccessToken).Methods("GET") +const ( + // long-polling timeout from http header + parBlock = "block" + // long-polling tag used for comparision + // if tag fails to match, new token is returned immediately + parTag = "If-None-Match" +) + +type ApiManager struct { + tokenMan tokenManager + endpoint string } -func getAccessToken(w http.ResponseWriter, r *http.Request) { - b := r.URL.Query().Get("block") +func (a *ApiManager) InitAPI(api apid.APIService) { + api.HandleFunc(a.endpoint, a.getAccessToken).Methods("GET") +} + +func (a *ApiManager) getAccessToken(w http.ResponseWriter, r *http.Request) { + b := r.URL.Query().Get(parBlock) var timeout int if b != "" { var err error timeout, err = strconv.Atoi(b) - if err != nil { + if err != nil || timeout < 0 { writeError(w, http.StatusBadRequest, "bad block value, must be number of seconds") return } } log.Debugf("api timeout: %d", timeout) - ifNoneMatch := r.Header.Get("If-None-Match") - - if apidTokenManager.getBearerToken() != ifNoneMatch { - w.Write([]byte(apidTokenManager.getBearerToken())) + ifNoneMatch := r.Header.Get(parTag) + log.Debugf("ifNoneMatch: %s", ifNoneMatch) + if a.tokenMan.getBearerToken() != ifNoneMatch { + w.Write([]byte(a.tokenMan.getBearerToken())) return } select { - case <-apidTokenManager.getTokenReadyChannel(): - w.Write([]byte(apidTokenManager.getBearerToken())) + case <-a.tokenMan.getTokenReadyChannel(): + w.Write([]byte(a.tokenMan.getBearerToken())) case <-time.After(time.Duration(timeout) * time.Second): w.WriteHeader(http.StatusNotModified) }
diff --git a/api_test.go b/api_test.go new file mode 100644 index 0000000..d46ac4c --- /dev/null +++ b/api_test.go
@@ -0,0 +1,125 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apidApigeeSync + +import ( + "fmt" + "github.com/apid/apid-core" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "time" +) + +const ( + apiTestUrl = "http://127.0.0.1:9000" +) + +var _ = Describe("API Manager", func() { + testCount := 0 + var testApiMan *ApiManager + var dummyTokenMan *dummyTokenManager + var client *http.Client + BeforeEach(func() { + testCount++ + dummyTokenMan = &dummyTokenManager{ + token: fmt.Sprintf("test_token_%d", testCount), + tokenReadyChan: make(chan bool, 1), + } + testApiMan = &ApiManager{ + endpoint: tokenEndpoint + strconv.Itoa(testCount), + tokenMan: dummyTokenMan, + } + testApiMan.InitAPI(apid.API()) + time.Sleep(100 * time.Millisecond) + client = &http.Client{} + }) + + clientGet := func(path string, pars map[string][]string, header map[string][]string) (int, []byte) { + uri, err := url.Parse(apiTestUrl + path) + Expect(err).Should(Succeed()) + query := url.Values(pars) + uri.RawQuery = query.Encode() + httpReq, err := http.NewRequest("GET", uri.String(), nil) + httpReq.Header = http.Header(header) + Expect(err).Should(Succeed()) + res, err := client.Do(httpReq) + Expect(err).Should(Succeed()) + defer res.Body.Close() + responseBody, err := ioutil.ReadAll(res.Body) + Expect(err).Should(Succeed()) + return res.StatusCode, responseBody + } + + It("should get token without long-polling", func() { + code, res := clientGet(testApiMan.endpoint, nil, nil) + Expect(code).Should(Equal(http.StatusOK)) + Expect(string(res)).Should(Equal(dummyTokenMan.token)) + }) + + It("should get bad request for invalid timeout", func() { + code, _ := clientGet(testApiMan.endpoint, map[string][]string{ + parBlock: {"invalid"}, + }, map[string][]string{ + parTag: {dummyTokenMan.getBearerToken()}, + }) + Expect(code).Should(Equal(http.StatusBadRequest)) + + code, _ = clientGet(testApiMan.endpoint, map[string][]string{ + parBlock: {"-1"}, + }, map[string][]string{ + parTag: {dummyTokenMan.getBearerToken()}, + }) + Expect(code).Should(Equal(http.StatusBadRequest)) + }) + + It("should get token immediately if mismatch", func() { + code, res := clientGet(testApiMan.endpoint, map[string][]string{ + parBlock: {"10"}, + }, map[string][]string{ + parTag: {"mismatch"}, + }) + Expect(code).Should(Equal(http.StatusOK)) + Expect(string(res)).Should(Equal(dummyTokenMan.token)) + }, 3) + + It("should get StatusNotModified if timeout", func() { + code, _ := clientGet(testApiMan.endpoint, map[string][]string{ + parBlock: {"1"}, + }, map[string][]string{ + parTag: {dummyTokenMan.getBearerToken()}, + }) + Expect(code).Should(Equal(http.StatusNotModified)) + }, 3) + + It("should do long-polling", func() { + go func() { + time.Sleep(1) + dummyTokenMan.token = "new_token" + dummyTokenMan.tokenReadyChan <- true + }() + code, res := clientGet(testApiMan.endpoint, map[string][]string{ + parBlock: {"10"}, + }, map[string][]string{ + parTag: {dummyTokenMan.getBearerToken()}, + }) + Expect(code).Should(Equal(http.StatusOK)) + Expect(string(res)).Should(Equal(dummyTokenMan.getBearerToken())) + }, 3) + +})
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 5055f68..47c5498 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -19,72 +19,70 @@ . "github.com/onsi/gomega" "io/ioutil" - "net/http/httptest" "os" "testing" "time" "github.com/apid/apid-core" - + "github.com/apid/apid-core/events" "github.com/apid/apid-core/factory" ) -var ( - tmpDir string - testServer *httptest.Server - testRouter apid.Router - testMock *MockServer - wipeDBAferTest bool -) - const dummyConfigValue string = "placeholder" const expectedClusterId = "bootstrap" +var tmpDir string + var _ = BeforeSuite(func() { - wipeDBAferTest = true -}) - -var _ = BeforeEach(func() { apid.Initialize(factory.DefaultServicesFactory()) - - config = apid.Config() dataService = apid.Data() - events = apid.Events() - + config = apid.Config() + apiService = apid.API() + go apiService.Listen() + //dataService = apid.Data() + log = apid.Log().ForModule("apigeeSync") var err error - tmpDir, err = ioutil.TempDir("", "api_test") + tmpDir, err = ioutil.TempDir("", "apid_test") Expect(err).NotTo(HaveOccurred()) - config.Set("local_storage_path", tmpDir) - + config.Set(configLocalStoragePath, tmpDir) config.Set(configProxyServerBaseURI, dummyConfigValue) config.Set(configSnapServerBaseURI, dummyConfigValue) config.Set(configChangeServerBaseURI, dummyConfigValue) config.Set(configSnapshotProtocol, "sqlite") config.Set(configPollInterval, 10*time.Millisecond) config.Set(configDiagnosticMode, false) - config.Set(configName, "testhost") - config.Set(configApidClusterId, expectedClusterId) config.Set(configConsumerKey, "XXXXXXX") config.Set(configConsumerSecret, "YYYYYYY") - - block = "0" - log = apid.Log().ForModule("apigeeSync") + config.Set(configApidInstanceID, "YYYYYYY") }, 3) +var _ = BeforeEach(func() { + eventService = events.CreateService() + config.Set(configName, "testhost") + config.Set(configApidClusterId, expectedClusterId) + apidInfo.ClusterID = expectedClusterId + apidInfo.InstanceID = "YYYYYYY" + apidInfo.LastSnapshot = "" + apidInfo.IsNewInstance = true +}) + var _ = AfterEach(func() { - apid.Events().Close() - lastSequence = "" + cleanCommonDb() + eventService.Close() }) var _ = AfterSuite(func() { - apid.Events().Close() - if testServer != nil { - testServer.Close() - } - os.RemoveAll(tmpDir) + Expect(os.RemoveAll(tmpDir)).Should(Succeed()) }) func TestApigeeSync(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "ApigeeSync Suite") } + +func cleanCommonDb() { + db, err := dataService.DB() + Expect(err).Should(Succeed()) + _, err = db.Exec(`DROP TABLE IF EXISTS APID;`) + Expect(err).Should(Succeed()) +}
diff --git a/apigee_sync.go b/apigee_sync.go deleted file mode 100644 index 8a23079..0000000 --- a/apigee_sync.go +++ /dev/null
@@ -1,140 +0,0 @@ -// Copyright 2017 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package apidApigeeSync - -import ( - "github.com/apid/apid-core" - "net/http" - "time" -) - -const ( - httpTimeout = time.Minute - pluginTimeout = time.Minute - maxIdleConnsPerHost = 10 -) - -var knownTables = make(map[string]bool) - -/* - * Start from existing snapshot if possible - * If an existing snapshot does not exist, use the apid scope to fetch - * all data scopes, then get a snapshot for those data scopes - * - * Then, poll for changes - */ -func bootstrap() { - if isOfflineMode && apidInfo.LastSnapshot == "" { - log.Panic("Diagnostic mode requires existent snapshot info in default DB.") - } - - if apidInfo.LastSnapshot != "" { - snapshot := apidSnapshotManager.startOnLocalSnapshot(apidInfo.LastSnapshot) - events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { - apidChangeManager.pollChangeWithBackoff() - }) - - log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) - return - } - - apidSnapshotManager.downloadBootSnapshot() - apidSnapshotManager.downloadDataSnapshot() - - apidChangeManager.pollChangeWithBackoff() - -} - -/* - * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy - * for retrying on errors - */ -func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) { - - backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true) - - //inintialize the retry channel to start first attempt immediately - retry := time.After(0 * time.Millisecond) - - for { - select { - case <-quit: - log.Info("Quit signal recieved. Returning") - return - case <-retry: - start := time.Now() - - err := toExecute(quit) - if err == nil { - return - } - - if _, ok := err.(quitSignalError); ok { - return - } - - end := time.Now() - //error encountered, since we would have returned above otherwise - handleError(err) - - /* TODO keep this around? Imagine an immediately erroring service, - * causing many sequential requests which could pollute logs - */ - //only backoff if the request took less than one second - if end.After(start.Add(time.Second)) { - backoff.Reset() - retry = time.After(0 * time.Millisecond) - } else { - retry = time.After(backoff.Duration()) - } - } - } -} - -func addHeaders(req *http.Request) { - req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken()) - req.Header.Set("apid_instance_id", apidInfo.InstanceID) - req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) - req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) -} - -type changeServerError struct { - Code string `json:"code"` -} - -type quitSignalError struct { -} - -type expected200Error struct { -} - -type authFailError struct { -} - -func (an expected200Error) Error() string { - return "Did not recieve OK response" -} - -func (a quitSignalError) Error() string { - return "Signal to quit encountered" -} - -func (a changeServerError) Error() string { - return a.Code -} - -func (a authFailError) Error() string { - return "Authorization failed" -}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go deleted file mode 100644 index f7144a1..0000000 --- a/apigee_sync_test.go +++ /dev/null
@@ -1,452 +0,0 @@ -// Copyright 2017 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package apidApigeeSync - -import ( - "github.com/apid/apid-core" - "github.com/apid/apid-core/util" - "github.com/apigee-labs/transicator/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "net/http" - "net/http/httptest" -) - -var _ = Describe("Sync", func() { - Context("offline mode", func() { - var ( - testInstanceID = util.GenerateUUID() - testInstanceName = "offline-instance-name" - testClusterID = "offline-cluster-id" - testLastSnapshot = "offline-last-snapshot" - testChangeMan *dummyChangeManager - ) - - var _ = BeforeEach(func() { - config.Set(configDiagnosticMode, true) - config.Set(configApidClusterId, testClusterID) - _initPlugin(apid.AllServices()) - apidSnapshotManager = &dummySnapshotManager{} - testChangeMan = &dummyChangeManager{ - pollChangeWithBackoffChan: make(chan bool, 1), - } - apidChangeManager = testChangeMan - apidTokenManager = &dummyTokenManager{} - apidInfo = apidInstanceInfo{ - InstanceID: testInstanceID, - InstanceName: testInstanceName, - ClusterID: testClusterID, - LastSnapshot: testLastSnapshot, - } - updateApidInstanceInfo() - - }) - - var _ = AfterEach(func() { - config.Set(configDiagnosticMode, false) - if wipeDBAferTest { - db, err := dataService.DB() - Expect(err).NotTo(HaveOccurred()) - tx, err := db.Begin() - _, err = tx.Exec("DELETE FROM APID") - Expect(err).NotTo(HaveOccurred()) - err = tx.Commit() - Expect(err).NotTo(HaveOccurred()) - - } - wipeDBAferTest = true - newInstanceID = true - isOfflineMode = false - }) - - It("offline mode should bootstrap from local DB", func(done Done) { - - Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - - if s, ok := event.(*common.Snapshot); ok { - // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed - // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler - Expect(s.SnapshotInfo).Should(Equal(testLastSnapshot)) - Expect(s.Tables).To(BeNil()) - close(done) - } - }) - pie := apid.PluginsInitializedEvent{ - Description: "plugins initialized", - } - pie.Plugins = append(pie.Plugins, pluginData) - postInitPlugins(pie) - - }, 3) - - }) - - Context("online mode", func() { - var _ = BeforeEach(func() { - _initPlugin(apid.AllServices()) - createManagers() - }) - - var _ = AfterEach(func() { - if wipeDBAferTest { - db, err := dataService.DB() - Expect(err).NotTo(HaveOccurred()) - tx, err := db.Begin() - _, err = tx.Exec("DELETE FROM APID") - Expect(err).NotTo(HaveOccurred()) - err = tx.Commit() - Expect(err).NotTo(HaveOccurred()) - } - wipeDBAferTest = true - }) - - const expectedDataScopeId1 = "dataScope1" - const expectedDataScopeId2 = "dataScope2" - - var initializeContext = func() { - testRouter = apid.API().Router() - testServer = httptest.NewServer(testRouter) - - // set up mock server - mockParms := MockParms{ - ReliableAPI: false, - ClusterID: config.GetString(configApidClusterId), - TokenKey: config.GetString(configConsumerKey), - TokenSecret: config.GetString(configConsumerSecret), - Scope: "ert452", - Organization: "att", - Environment: "prod", - } - testMock = Mock(mockParms, testRouter) - - config.Set(configProxyServerBaseURI, testServer.URL) - config.Set(configSnapServerBaseURI, testServer.URL) - config.Set(configChangeServerBaseURI, testServer.URL) - } - - var restoreContext = func() { - - testServer.Close() - - config.Set(configProxyServerBaseURI, dummyConfigValue) - config.Set(configSnapServerBaseURI, dummyConfigValue) - config.Set(configChangeServerBaseURI, dummyConfigValue) - - } - - It("should succesfully bootstrap from clean slate", func(done Done) { - log.Info("Starting sync tests...") - var closeDone <-chan bool - initializeContext() - // do not wipe DB after. Lets use it - wipeDBAferTest = false - var lastSnapshot *common.Snapshot - - expectedSnapshotTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms_company"}, - common.Change{Table: "edgex_apid_cluster"}, - common.Change{Table: "edgex_data_scope"}, - common.Change{Table: "kms_app_credential"}, - common.Change{Table: "kms_app_credential_apiproduct_mapper"}, - common.Change{Table: "kms_developer"}, - common.Change{Table: "kms_company_developer"}, - common.Change{Table: "kms_api_product"}, - common.Change{Table: "kms_app"}}, - } - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - if s, ok := event.(*common.Snapshot); ok { - - Expect(16).To(Equal(len(knownTables))) - Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) - - lastSnapshot = s - - db, _ := dataService.DBVersion(s.SnapshotInfo) - var rowCount int - var id string - - err := db.Ping() - Expect(err).NotTo(HaveOccurred()) - numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;") - if err != nil { - Fail("Failed to get correct DB") - } - Expect(true).To(Equal(numApidClusters.Next())) - numApidClusters.Scan(&rowCount) - Expect(1).To(Equal(rowCount)) - numApidClusters.Close() - apidClusters, err := db.Query("select id from edgex_apid_cluster;") - Expect(err).NotTo(HaveOccurred()) - apidClusters.Next() - apidClusters.Scan(&id) - Expect(id).To(Equal(expectedClusterId)) - apidClusters.Close() - - numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;") - Expect(err).NotTo(HaveOccurred()) - Expect(true).To(Equal(numDataScopes.Next())) - numDataScopes.Scan(&rowCount) - Expect(2).To(Equal(rowCount)) - numDataScopes.Close() - dataScopes, err := db.Query("select id from edgex_data_scope;") - Expect(err).NotTo(HaveOccurred()) - dataScopes.Next() - dataScopes.Scan(&id) - dataScopes.Next() - - if id == expectedDataScopeId1 { - dataScopes.Scan(&id) - Expect(id).To(Equal(expectedDataScopeId2)) - } else { - dataScopes.Scan(&id) - Expect(id).To(Equal(expectedDataScopeId1)) - } - dataScopes.Close() - - } else if cl, ok := event.(*common.ChangeList); ok { - closeDone = apidChangeManager.close() - // ensure that snapshot switched DB versions - Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) - expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - Expect(getDB() == expectedDB).Should(BeTrue()) - - Expect(cl.Changes).To(HaveLen(6)) - - var tables []string - for _, c := range cl.Changes { - tables = append(tables, c.Table) - Expect(c.NewRow).ToNot(BeNil()) - - var tenantID string - c.NewRow.Get("tenant_id", &tenantID) - Expect(tenantID).To(Equal("ert452")) - } - - Expect(tables).To(ContainElement("kms_app_credential")) - Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper")) - Expect(tables).To(ContainElement("kms_developer")) - Expect(tables).To(ContainElement("kms_company_developer")) - Expect(tables).To(ContainElement("kms_api_product")) - Expect(tables).To(ContainElement("kms_app")) - - go func() { - // when close done, all handlers for the first changeList have been executed - <-closeDone - defer GinkgoRecover() - // allow other handler to execute to insert last_sequence - var seq string - err = getDB(). - QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;"). - Scan(&seq) - Expect(err).NotTo(HaveOccurred()) - //} - Expect(seq).To(Equal(cl.LastSequence)) - - restoreContext() - close(done) - }() - - } - }) - pie := apid.PluginsInitializedEvent{ - Description: "plugins initialized", - } - pie.Plugins = append(pie.Plugins, pluginData) - postInitPlugins(pie) - }, 5) - - It("should bootstrap from local DB if present", func(done Done) { - - var closeDone <-chan bool - - initializeContext() - expectedTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms_company"}, - common.Change{Table: "edgex_apid_cluster"}, - common.Change{Table: "edgex_data_scope"}}, - } - Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - - if s, ok := event.(*common.Snapshot); ok { - // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed - // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler - closeDone = apidChangeManager.close() - go func() { - // when close done, all handlers for the first snapshot have been executed - <-closeDone - //verify that the knownTables array has been properly populated from existing DB - Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) - - Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) - Expect(s.Tables).To(BeNil()) - - restoreContext() - close(done) - }() - - } - }) - pie := apid.PluginsInitializedEvent{ - Description: "plugins initialized", - } - pie.Plugins = append(pie.Plugins, pluginData) - postInitPlugins(pie) - - }, 3) - - It("should detect apid_cluster_id change in config yaml", func() { - Expect(apidInfo).ToNot(BeNil()) - Expect(apidInfo.ClusterID).To(Equal("bootstrap")) - Expect(apidInfo.InstanceID).ToNot(BeEmpty()) - previousInstanceId := apidInfo.InstanceID - - config.Set(configApidClusterId, "new value") - apidInfo, err := getApidInstanceInfo() - Expect(err).NotTo(HaveOccurred()) - Expect(apidInfo.LastSnapshot).To(BeEmpty()) - Expect(apidInfo.InstanceID).ToNot(BeEmpty()) - Expect(apidInfo.InstanceID).ToNot(Equal(previousInstanceId)) - Expect(apidInfo.ClusterID).To(Equal("new value")) - }) - - It("should correctly identify non-proper subsets with respect to maps", func() { - - //test b proper subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "b"}}, - )).To(BeFalse()) - - //test a == b - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, - )).To(BeFalse()) - - //test b superset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, - )).To(BeTrue()) - - //test b not subset of a - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{common.Change{Table: "c"}}, - )).To(BeTrue()) - - //test a empty - Expect(changesHaveNewTables(map[string]bool{}, - []common.Change{common.Change{Table: "a"}}, - )).To(BeTrue()) - - //test b empty - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, - []common.Change{}, - )).To(BeFalse()) - - //test b nil - Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse()) - - //test a nil - Expect(changesHaveNewTables(nil, - []common.Change{common.Change{Table: "a"}}, - )).To(BeTrue()) - }, 3) - - // todo: disabled for now - - // there is precondition I haven't been able to track down that breaks this test on occasion - XIt("should process a new snapshot when change server requires it", func(done Done) { - oldSnap := apidInfo.LastSnapshot - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) - close(done) - } - }) - testMock.forceNewSnapshot() - }) - - It("Verify the Sequence Number Logic works as expected", func() { - Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) - Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) - Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) - Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) - Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) - Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) - }, 3) - - /* - * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap - */ - It("Should be able to handle duplicate snapshot during bootstrap", func() { - initializeContext() - apidTokenManager = createSimpleTokenManager() - apidTokenManager.start() - apidSnapshotManager = createSnapShotManager() - //events.Listen(ApigeeSyncEventSelector, &handler{}) - - scopes := []string{apidInfo.ClusterID} - snapshot := &common.Snapshot{} - apidSnapshotManager.downloadSnapshot(true, scopes, snapshot) - apidSnapshotManager.storeBootSnapshot(snapshot) - apidSnapshotManager.storeDataSnapshot(snapshot) - restoreContext() - <-apidSnapshotManager.close() - apidTokenManager.close() - }, 3) - - It("Reuse http.Client connection for multiple concurrent requests", func() { - var tr *http.Transport - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - })) - tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL)) - tr.MaxIdleConnsPerHost = maxIdleConnsPerHost - - var rspcnt int = 0 - ch := make(chan *http.Response) - client := &http.Client{Transport: tr} - for i := 0; i < 2*maxIdleConnsPerHost; i++ { - go func(client *http.Client) { - req, err := http.NewRequest("GET", server.URL, nil) - resp, err := client.Do(req) - if err != nil { - Fail("Unable to process Client request") - } - ch <- resp - resp.Body.Close() - - }(client) - } - for { - select { - case resp := <-ch: - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - if rspcnt >= 2*maxIdleConnsPerHost-1 { - return - } - rspcnt++ - default: - } - } - - }, 3) - - }) -})
diff --git a/backoff.go b/backoff.go deleted file mode 100644 index bad8077..0000000 --- a/backoff.go +++ /dev/null
@@ -1,98 +0,0 @@ -// Copyright 2017 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package apidApigeeSync - -import ( - "math" - "math/rand" - "time" -) - -const defaultInitial time.Duration = 200 * time.Millisecond -const defaultMax time.Duration = 10 * time.Second -const defaultFactor float64 = 2 - -type Backoff struct { - attempt int - initial, max time.Duration - jitter bool - backoffStrategy func() time.Duration -} - -type ExponentialBackoff struct { - Backoff - factor float64 -} - -func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff { - backoff := &ExponentialBackoff{} - - if initial <= 0 { - initial = defaultInitial - } - if max <= 0 { - max = defaultMax - } - - if factor <= 0 { - factor = defaultFactor - } - - backoff.initial = initial - backoff.max = max - backoff.attempt = 0 - backoff.factor = factor - backoff.jitter = jitter - backoff.backoffStrategy = backoff.exponentialBackoffStrategy - - return backoff -} - -func (b *Backoff) Duration() time.Duration { - d := b.backoffStrategy() - b.attempt++ - return d -} - -func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration { - - initial := float64(b.Backoff.initial) - attempt := float64(b.Backoff.attempt) - duration := initial * math.Pow(b.factor, attempt) - - if duration > math.MaxInt64 { - return b.max - } - dur := time.Duration(duration) - - if b.jitter { - duration = (rand.Float64()*(duration-initial) + initial) - } - - if dur > b.max { - return b.max - } - - log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond)) - return dur -} - -func (b *Backoff) Reset() { - b.attempt = 0 -} - -func (b *Backoff) Attempt() int { - return b.attempt -}
diff --git a/change_test.go b/change_test.go index 511eb82..97d5fb3 100644 --- a/change_test.go +++ b/change_test.go
@@ -16,144 +16,196 @@ import ( "github.com/apid/apid-core" + "github.com/apid/apid-core/api" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "net/http" "net/http/httptest" - "os" + "strconv" "time" ) +const ( + expectedInstanceId = "dummy" +) + var _ = Describe("Change Agent", func() { Context("Change Agent Unit Tests", func() { - var createTestDb = func(sqlfile string, dbId string) common.Snapshot { - initDb(sqlfile, "./mockdb_change.sqlite3") - file, err := os.Open("./mockdb_change.sqlite3") - Expect(err).Should(Succeed()) - s := common.Snapshot{} - err = processSnapshotServerFileResponse(dbId, file, &s) - Expect(err).Should(Succeed()) - return s - } + Context("utils", func() { - var initializeContext = func() { - testRouter = apid.API().Router() - testServer = httptest.NewServer(testRouter) + It("should correctly identify non-proper subsets with respect to maps", func() { + //test b proper subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{{Table: "b"}}, + )).To(BeFalse()) - // set up mock server - mockParms := MockParms{ - ReliableAPI: true, - ClusterID: config.GetString(configApidClusterId), - TokenKey: config.GetString(configConsumerKey), - TokenSecret: config.GetString(configConsumerSecret), - Scope: "ert452", - Organization: "att", - Environment: "prod", - } - testMock = Mock(mockParms, testRouter) + //test a == b + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{{Table: "a"}, {Table: "b"}}, + )).To(BeFalse()) - config.Set(configProxyServerBaseURI, testServer.URL) - config.Set(configSnapServerBaseURI, testServer.URL) - config.Set(configChangeServerBaseURI, testServer.URL) - config.Set(configPollInterval, 1*time.Millisecond) - } + //test b superset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{{Table: "a"}, {Table: "b"}, {Table: "c"}}, + )).To(BeTrue()) - var restoreContext = func() { + //test b not subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{{Table: "c"}}, + )).To(BeTrue()) - testServer.Close() - config.Set(configProxyServerBaseURI, dummyConfigValue) - config.Set(configSnapServerBaseURI, dummyConfigValue) - config.Set(configChangeServerBaseURI, dummyConfigValue) - config.Set(configPollInterval, 10*time.Millisecond) - } + //test a empty + Expect(changesHaveNewTables(map[string]bool{}, + []common.Change{{Table: "a"}}, + )).To(BeTrue()) - var _ = BeforeEach(func() { - _initPlugin(apid.AllServices()) - createManagers() - event := createTestDb("./sql/init_mock_db.sql", "test_change") - processSnapshot(&event) - knownTables = extractTablesFromDB(getDB()) - }) + //test b empty + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{}, + )).To(BeFalse()) - var _ = AfterEach(func() { - restoreContext() - if wipeDBAferTest { - db, err := dataService.DB() - Expect(err).Should(Succeed()) - tx, err := db.Begin() - _, err = tx.Exec("DELETE FROM APID") - Expect(err).Should(Succeed()) - err = tx.Commit() - Expect(err).Should(Succeed()) - } - wipeDBAferTest = true - }) + //test b nil + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse()) - It("test change agent with authorization failure", func() { - log.Debug("test change agent with authorization failure") - testTokenManager := &dummyTokenManager{make(chan bool)} - apidTokenManager = testTokenManager - apidTokenManager.start() - apidSnapshotManager = &dummySnapshotManager{} - initializeContext() - testMock.forceAuthFail() - wipeDBAferTest = true - apidChangeManager.pollChangeWithBackoff() - // auth check fails - <-testTokenManager.invalidateChan - log.Debug("closing") - <-apidChangeManager.close() - }) - - It("test change agent with too old snapshot", func() { - log.Debug("test change agent with too old snapshot") - testTokenManager := &dummyTokenManager{make(chan bool)} - apidTokenManager = testTokenManager - apidTokenManager.start() - testSnapshotManager := &dummySnapshotManager{make(chan bool)} - apidSnapshotManager = testSnapshotManager - initializeContext() - - testMock.passAuthCheck() - testMock.forceNewSnapshot() - wipeDBAferTest = true - apidChangeManager.pollChangeWithBackoff() - <-testSnapshotManager.downloadCalledChan - log.Debug("closing") - <-apidChangeManager.close() - }) - - It("change agent should retry with authorization failure", func(done Done) { - log.Debug("change agent should retry with authorization failure") - testTokenManager := &dummyTokenManager{make(chan bool)} - apidTokenManager = testTokenManager - apidTokenManager.start() - apidSnapshotManager = &dummySnapshotManager{} - initializeContext() - testMock.forceAuthFail() - testMock.forceNoSnapshot() - wipeDBAferTest = true - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - - if _, ok := event.(*common.ChangeList); ok { - closeDone := apidChangeManager.close() - log.Debug("closing") - go func() { - // when close done, all handlers for the first snapshot have been executed - <-closeDone - close(done) - }() - - } + //test a nil + Expect(changesHaveNewTables(nil, + []common.Change{{Table: "a"}}, + )).To(BeTrue()) }) - apidChangeManager.pollChangeWithBackoff() - // auth check fails - <-testTokenManager.invalidateChan - }, 2) + It("Compare Sequence Number", func() { + Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) + Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) + Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) + Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) + Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) + Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) + }) + }) + + Context("changeManager", func() { + testCount := 0 + var testChangeMan *pollChangeManager + var dummyDbMan *dummyDbManager + var dummySnapMan *dummySnapshotManager + var dummyTokenMan *dummyTokenManager + var testServer *httptest.Server + var testRouter apid.Router + var testMock *MockServer + BeforeEach(func() { + testCount++ + dummyDbMan = &dummyDbManager{ + knownTables: map[string]bool{ + "_transicator_metadata": true, + "_transicator_tables": true, + "attributes": true, + "edgex_apid_cluster": true, + "edgex_data_scope": true, + "kms_api_product": true, + "kms_app": true, + "kms_app_credential": true, + "kms_app_credential_apiproduct_mapper": true, + "kms_company": true, + "kms_company_developer": true, + "kms_deployment": true, + "kms_developer": true, + "kms_organization": true, + }, + scopes: []string{"43aef41d"}, + lastSeqUpdated: make(chan string, 1), + } + dummySnapMan = &dummySnapshotManager{ + downloadCalledChan: make(chan bool, 1), + } + dummyTokenMan = &dummyTokenManager{ + invalidateChan: make(chan bool, 1), + } + client := &http.Client{} + testChangeMan = createChangeManager(dummyDbMan, dummySnapMan, dummyTokenMan, client) + testChangeMan.block = 0 + + // create a new API service to have a new router for testing + testRouter = api.CreateService().Router() + testServer = httptest.NewServer(testRouter) + // set up mock server + mockParms := MockParms{ + ReliableAPI: true, + ClusterID: config.GetString(configApidClusterId), + TokenKey: config.GetString(configConsumerKey), + TokenSecret: config.GetString(configConsumerSecret), + Scope: "", + Organization: "att", + Environment: "prod", + } + apidInfo.ClusterID = expectedClusterId + apidInfo.InstanceID = expectedInstanceId + testMock = Mock(mockParms, testRouter) + config.Set(configProxyServerBaseURI, testServer.URL) + config.Set(configSnapServerBaseURI, testServer.URL) + config.Set(configChangeServerBaseURI, testServer.URL) + config.Set(configPollInterval, 1*time.Millisecond) + + initialBackoffInterval = time.Millisecond + testMock.oauthToken = "test_token_" + strconv.Itoa(testCount) + dummyTokenMan.token = testMock.oauthToken + + }) + + AfterEach(func() { + testServer.Close() + <-testChangeMan.close() + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) + config.Set(configPollInterval, 10*time.Millisecond) + }) + + It("test change agent with authorization failure", func() { + log.Debug("test change agent with authorization failure") + testMock.forceAuthFailOnce() + testChangeMan.pollChangeWithBackoff() + // auth check fails + <-dummyTokenMan.invalidateChan + log.Debug("closing") + }) + + It("test change agent with too old snapshot", func() { + log.Debug("test change agent with too old snapshot") + testMock.passAuthCheck() + testMock.forceNewSnapshot() + testChangeMan.pollChangeWithBackoff() + <-dummySnapMan.downloadCalledChan + log.Debug("closing") + }) + + It("change agent should retry with authorization failure", func() { + log.Debug("change agent should retry with authorization failure") + testMock.forceAuthFailOnce() + testMock.forceNoSnapshot() + called := false + eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + if _, ok := event.(*common.ChangeList); ok { + called = true + } + }) + testChangeMan.pollChangeWithBackoff() + <-dummyTokenMan.invalidateChan + Expect(<-dummyDbMan.lastSeqUpdated).Should(Equal(testMock.lastSequenceID())) + Expect(called).Should(BeTrue()) + }, 3) + + }) + + Context("offline change manager", func() { + It("offline change manager should have no effect", func() { + o := &offlineChangeManager{} + o.pollChangeWithBackoff() + <-o.close() + }) + }) }) })
diff --git a/changes.go b/changes.go index b78a1d1..6afa827 100644 --- a/changes.go +++ b/changes.go
@@ -16,34 +16,44 @@ import ( "encoding/json" + "fmt" "github.com/apigee-labs/transicator/common" "io/ioutil" "net/http" "net/url" "path" "sort" + "strconv" "sync/atomic" "time" ) -var lastSequence string -var block string = "45" - type pollChangeManager struct { // 0 for not closed, 1 for closed isClosed *int32 // 0 for pollChangeWithBackoff() not launched, 1 for launched - isLaunched *int32 - quitChan chan bool + isLaunched *int32 + quitChan chan bool + block int + lastSequence string + dbMan DbManager + snapMan snapshotManager + tokenMan tokenManager + client *http.Client } -func createChangeManager() *pollChangeManager { +func createChangeManager(dbMan DbManager, snapMan snapshotManager, tokenMan tokenManager, client *http.Client) *pollChangeManager { isClosedInt := int32(0) isLaunchedInt := int32(0) return &pollChangeManager{ isClosed: &isClosedInt, quitChan: make(chan bool), isLaunched: &isLaunchedInt, + block: 45, + dbMan: dbMan, + snapMan: snapMan, + tokenMan: tokenMan, + client: client, } } @@ -56,7 +66,7 @@ finishChan := make(chan bool, 1) //has been closed if atomic.SwapInt32(c.isClosed, 1) == int32(1) { - log.Error("pollChangeManager: close() called on a closed pollChangeManager!") + log.Warn("pollChangeManager: close() called on a closed pollChangeManager!") go func() { log.Debug("change manager closed") finishChan <- false @@ -65,11 +75,10 @@ } // not launched if atomic.LoadInt32(c.isLaunched) == int32(0) { - log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!") + log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched!") go func() { - c.quitChan <- true - apidTokenManager.close() - <-apidSnapshotManager.close() + c.tokenMan.close() + <-c.snapMan.close() log.Debug("change manager closed") finishChan <- false }() @@ -79,8 +88,8 @@ log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager") go func() { c.quitChan <- true - apidTokenManager.close() - <-apidSnapshotManager.close() + c.tokenMan.close() + <-c.snapMan.close() log.Debug("change manager closed") finishChan <- true }() @@ -120,43 +129,147 @@ * Check to see if we have lastSequence already saved in the DB, * in which case, it has to be used to prevent re-reading same data */ - lastSequence = getLastSequence() - + c.lastSequence = c.dbMan.getLastSequence() for { select { case <-c.quitChan: log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager") - return quitSignalError{} + return quitSignalError default: - err := c.getChanges(changesUri) + scopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID) if err != nil { - if _, ok := err.(quitSignalError); ok { - log.Debug("pollChangeAgent: consuming the quit signal") - <-c.quitChan - } + return err + } + r, err := c.getChanges(scopes, changesUri) + if err != nil { + return err + } + cl, err := c.parseChangeResp(r) + if err != nil { + return err + } + if err = c.emitChangeList(scopes, cl); err != nil { return err } } } } -//TODO refactor this method more, split it up -/* Make a single request to the changeserver to get a changelist */ -func (c *pollChangeManager) getChanges(changesUri *url.URL) error { - // if closed - if atomic.LoadInt32(c.isClosed) == int32(1) { - return quitSignalError{} +func (c *pollChangeManager) parseChangeResp(r *http.Response) (*common.ChangeList, error) { + var err error + defer r.Body.Close() + + if r.StatusCode != http.StatusOK { + log.Errorf("Get changes request failed with status code: %d", r.StatusCode) + switch r.StatusCode { + case http.StatusUnauthorized: + c.tokenMan.invalidateToken() + return nil, authFailError + + case http.StatusNotModified: + return nil, nil + case http.StatusBadRequest: + var apiErr changeServerError + var b []byte + b, err = ioutil.ReadAll(r.Body) + if err != nil { + log.Errorf("Unable to read response body: %v", err) + return nil, err + } + err = json.Unmarshal(b, &apiErr) + if err != nil { + log.Errorf("JSON Response Data not parsable: %s", string(b)) + return nil, err + } + if apiErr.Code == "SNAPSHOT_TOO_OLD" { + log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") + err = apiErr + } + return nil, err + default: + log.Errorf("Unknown response code from change server: %v", r.Status) + return nil, fmt.Errorf("unknown response code from change server: %v", r.Status) + } } + + resp := &common.ChangeList{} + err = json.NewDecoder(r.Body).Decode(resp) + if err != nil { + log.Errorf("JSON Response Data not parsable: %v", err) + return nil, err + } + return resp, nil +} + +func (c *pollChangeManager) emitChangeList(scopes []string, cl *common.ChangeList) error { + var err error + /* + * If the lastSequence is already newer or the same than what we got via + * cl.LastSequence, Ignore it. + */ + if c.lastSequence != "" && + getChangeStatus(c.lastSequence, cl.LastSequence) != 1 { + return nil + } + + if changesRequireDDLSync(c.dbMan.getKnowTables(), cl) { + return changeServerError{ + Code: "DDL changes detected; must get new snapshot", + } + } + + /* If valid data present, Emit to plugins */ + if len(cl.Changes) > 0 { + if err = c.dbMan.processChangeList(cl); err != nil { + log.Errorf("Error in processChangeList: %v", err) + return err + } + /* + * Check to see if there was any change in scope. If found, handle it + * by getting a new snapshot + */ + newScopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID) + if err != nil { + return err + } + cs := scopeChanged(newScopes, scopes) + if cs != nil { + return cs + } + select { + case <-time.After(httpTimeout): + log.Panic("Timeout. Plugins failed to respond to changes.") + case <-eventService.Emit(ApigeeSyncEventSelector, cl): + } + } else if c.lastSequence == "" { // emit the first changelist anyway + select { + case <-time.After(httpTimeout): + log.Panic("Timeout. Plugins failed to respond to changes.") + case <-eventService.Emit(ApigeeSyncEventSelector, cl): + } + } else { + log.Debugf("No Changes detected") + } + + err = c.dbMan.updateLastSequence(cl.LastSequence) + if err != nil { + log.Panicf("Unable to update Sequence in DB. Err {%v}", err) + } + c.lastSequence = cl.LastSequence + return nil +} + +/* Make a single request to the changeserver to get a changelist */ +func (c *pollChangeManager) getChanges(scopes []string, changesUri *url.URL) (*http.Response, error) { log.Debug("polling...") /* Find the scopes associated with the config id */ - scopes := findScopesForId(apidInfo.ClusterID) v := url.Values{} - blockValue := block + blockValue := strconv.Itoa(c.block) /* Sequence added to the query if available */ - if lastSequence != "" { - v.Add("since", lastSequence) + if c.lastSequence != "" { + v.Add("since", c.lastSequence) } else { blockValue = "0" } @@ -178,115 +291,16 @@ /* If error, break the loop, and retry after interval */ req, err := http.NewRequest("GET", uri, nil) - addHeaders(req) - r, err := httpclient.Do(req) + addHeaders(req, c.tokenMan.getBearerToken()) + r, err := c.client.Do(req) if err != nil { log.Errorf("change agent comm error: %s", err) - // if closed - if atomic.LoadInt32(c.isClosed) == int32(1) { - return quitSignalError{} - } - return err + return nil, err } - defer r.Body.Close() - - // has been closed - if atomic.LoadInt32(c.isClosed) == int32(1) { - log.Debugf("getChanges: changeManager has been closed") - return quitSignalError{} - } - - if r.StatusCode != http.StatusOK { - log.Errorf("Get changes request failed with status code: %d", r.StatusCode) - switch r.StatusCode { - case http.StatusUnauthorized: - err = apidTokenManager.invalidateToken() - if err != nil { - return err - } - return authFailError{} - - case http.StatusNotModified: - return nil - - case http.StatusBadRequest: - var apiErr changeServerError - var b []byte - b, err = ioutil.ReadAll(r.Body) - if err != nil { - log.Errorf("Unable to read response body: %v", err) - return err - } - err = json.Unmarshal(b, &apiErr) - if err != nil { - log.Errorf("JSON Response Data not parsable: %s", string(b)) - return err - } - if apiErr.Code == "SNAPSHOT_TOO_OLD" { - log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") - err = apiErr - } - return err - } - return nil - } - - var resp common.ChangeList - err = json.NewDecoder(r.Body).Decode(&resp) - if err != nil { - log.Errorf("JSON Response Data not parsable: %v", err) - return err - } - - /* - * If the lastSequence is already newer or the same than what we got via - * resp.LastSequence, Ignore it. - */ - if lastSequence != "" && - getChangeStatus(lastSequence, resp.LastSequence) != 1 { - return nil - } - - if changesRequireDDLSync(resp) { - return changeServerError{ - Code: "DDL changes detected; must get new snapshot", - } - } - - /* If valid data present, Emit to plugins */ - if len(resp.Changes) > 0 { - processChangeList(&resp) - select { - case <-time.After(httpTimeout): - log.Panic("Timeout. Plugins failed to respond to changes.") - case <-events.Emit(ApigeeSyncEventSelector, &resp): - } - } else if lastSequence == "" { - select { - case <-time.After(httpTimeout): - log.Panic("Timeout. Plugins failed to respond to changes.") - case <-events.Emit(ApigeeSyncEventSelector, &resp): - } - } else { - log.Debugf("No Changes detected for Scopes: %s", scopes) - } - - updateSequence(resp.LastSequence) - - /* - * Check to see if there was any change in scope. If found, handle it - * by getting a new snapshot - */ - newScopes := findScopesForId(apidInfo.ClusterID) - cs := scopeChanged(newScopes, scopes) - if cs != nil { - return cs - } - - return nil + return r, nil } -func changesRequireDDLSync(changes common.ChangeList) bool { +func changesRequireDDLSync(knownTables map[string]bool, changes *common.ChangeList) bool { return changesHaveNewTables(knownTables, changes.Changes) } @@ -296,10 +310,12 @@ log.Debugf("handleChangeServerError: changeManager has been closed") return } - if c, ok := err.(changeServerError); ok { - log.Debugf("%s. Fetch a new snapshot to sync...", c.Code) - apidSnapshotManager.downloadDataSnapshot() - } else { + + switch e := err.(type) { + case changeServerError: + log.Debugf("%s. Fetch a new snapshot to sync...", e.Code) + c.snapMan.downloadDataSnapshot() + default: log.Debugf("Error connecting to changeserver: %v", err) } } @@ -310,7 +326,7 @@ func changesHaveNewTables(a map[string]bool, changes []common.Change) bool { //nil maps should not be passed in. Making the distinction between nil map and empty map - if a == nil { + if len(a) == 0 { log.Warn("Nil map passed to function changesHaveNewTables, may be bug") return true } @@ -332,24 +348,15 @@ func getChangeStatus(lastSeq string, currSeq string) int { seqPrev, err := common.ParseSequence(lastSeq) if err != nil { - log.Panic("Unable to parse previous sequence string") + log.Panicf("Unable to parse previous sequence string: %v", err) } seqCurr, err := common.ParseSequence(currSeq) if err != nil { - log.Panic("Unable to parse current sequence string") + log.Panicf("Unable to parse current sequence string: %v", err) } return seqCurr.Compare(seqPrev) } -func updateSequence(seq string) { - lastSequence = seq - err := updateLastSequence(seq) - if err != nil { - log.Panicf("Unable to update Sequence in DB. Err {%v}", err) - } - -} - /* * Returns nil if the two arrays have matching contents */
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go index 486da41..8cfa1ef 100644 --- a/cmd/mockServer/main.go +++ b/cmd/mockServer/main.go
@@ -19,7 +19,6 @@ "os" - "github.com/apid/apid-core" "github.com/apid/apid-core/factory" "github.com/apid/apidApigeeSync" @@ -53,16 +52,16 @@ router := apid.API().Router() params := apidApigeeSync.MockParms{ - ReliableAPI: *reliable, - ClusterID: "cluster", - TokenKey: "key", - TokenSecret: "secret", - Scope: "scope", - Organization: "org", - Environment: "test", - NumDevelopers: *numDevs, - NumDeployments: *numDeps, - BundleURI: *bundleURI, + ReliableAPI: *reliable, + ClusterID: "cluster", + TokenKey: "key", + TokenSecret: "secret", + Scope: "scope", + Organization: "org", + Environment: "test", + NumDevelopers: *numDevs, + NumDeployments: *numDeps, + BundleURI: *bundleURI, } log.Printf("Params: %#v\n", params)
diff --git a/data.go b/data.go index c77fc10..be8dd34 100644 --- a/data.go +++ b/data.go
@@ -28,27 +28,36 @@ ) var ( - unsafeDB apid.DB - dbMux sync.RWMutex + dbMux sync.RWMutex ) -type dataApidCluster struct { - ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string - Updated, Created string -} - -type dataDataScope struct { - ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string - Updated, Created string -} - /* This plugin uses 2 databases: 1. The default DB is used for APID table. 2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE (Currently, the snapshot never changes, but this is future-proof) */ -func initDB(db apid.DB) error { + +func creatDbManager() *dbManager { + return &dbManager{ + DbMux: &sync.RWMutex{}, + knownTables: make(map[string]bool), + } +} + +type dbManager struct { + Db apid.DB + DbMux *sync.RWMutex + dbVersion string + knownTables map[string]bool +} + +// idempotent call to initialize default DB +func (dbMan *dbManager) initDB() error { + db, err := dataService.DB() + if err != nil { + return err + } tx, err := db.Begin() if err != nil { log.Errorf("initDB(): Unable to get DB tx err: {%v}", err) @@ -75,24 +84,22 @@ return nil } -func getDB() apid.DB { +func (dbMan *dbManager) getDB() apid.DB { dbMux.RLock() - db := unsafeDB - dbMux.RUnlock() - return db + defer dbMux.RUnlock() + return dbMan.Db } -func setDB(db apid.DB) { +func (dbMan *dbManager) setDB(db apid.DB) { dbMux.Lock() - unsafeDB = db - dbMux.Unlock() + defer dbMux.Unlock() + dbMan.Db = db } //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn -func insert(tableName string, rows []common.Row, txn apid.Tx) bool { - +func (dbMan *dbManager) insert(tableName string, rows []common.Row, txn apid.Tx) error { if len(rows) == 0 { - return false + return fmt.Errorf("no rows") } var orderedColumns []string @@ -101,12 +108,12 @@ } sort.Strings(orderedColumns) - sql := buildInsertSql(tableName, orderedColumns, rows) + sql := dbMan.buildInsertSql(tableName, orderedColumns, rows) prep, err := txn.Prepare(sql) if err != nil { - log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err) - return false + log.Errorf("INSERT Fail to prepare statement %s error=%v", sql, err) + return err } defer prep.Close() @@ -123,15 +130,15 @@ _, err = prep.Exec(values...) if err != nil { - log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err) - return false + log.Errorf("INSERT Fail %s values=%v error=%v", sql, values, err) + return err } - log.Debugf("INSERT Success [%s] values=%v", sql, values) + log.Debugf("INSERT Success %s values=%v", sql, values) - return true + return nil } -func getValueListFromKeys(row common.Row, pkeys []string) []interface{} { +func (dbMan *dbManager) getValueListFromKeys(row common.Row, pkeys []string) []interface{} { var values = make([]interface{}, len(pkeys)) for i, pkey := range pkeys { if row[pkey] == nil { @@ -143,52 +150,46 @@ return values } -func _delete(tableName string, rows []common.Row, txn apid.Tx) bool { - pkeys, err := getPkeysForTable(tableName) +func (dbMan *dbManager) delete(tableName string, rows []common.Row, txn apid.Tx) error { + pkeys, err := dbMan.getPkeysForTable(tableName) sort.Strings(pkeys) if len(pkeys) == 0 || err != nil { - log.Errorf("DELETE No primary keys found for table. %s", tableName) - return false + return fmt.Errorf("DELETE No primary keys found for table. %s", tableName) } if len(rows) == 0 { - log.Errorf("No rows found for table.", tableName) - return false + return fmt.Errorf("no rows found for table %s", tableName) } - sql := buildDeleteSql(tableName, rows[0], pkeys) + sql := dbMan.buildDeleteSql(tableName, rows[0], pkeys) prep, err := txn.Prepare(sql) if err != nil { - log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err) - return false + return fmt.Errorf("DELETE Fail to prep statement %s error=%v", sql, err) } defer prep.Close() for _, row := range rows { - values := getValueListFromKeys(row, pkeys) + values := dbMan.getValueListFromKeys(row, pkeys) // delete prepared statement from existing template statement res, err := txn.Stmt(prep).Exec(values...) if err != nil { - log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err) - return false + return fmt.Errorf("DELETE Fail %s values=%v error=%v", sql, values, err) } affected, err := res.RowsAffected() if err == nil && affected != 0 { - log.Debugf("DELETE Success [%s] values=%v", sql, values) + log.Debugf("DELETE Success %s values=%v", sql, values) } else if err == nil && affected == 0 { - log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values) - return false + return fmt.Errorf("entry not found %s values=%v, nothing to delete", sql, values) } else { - log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err) - return false + return fmt.Errorf("DELETE Failed %s values=%v error=%v", sql, values, err) } } - return true + return nil } // Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;" -func buildDeleteSql(tableName string, row common.Row, pkeys []string) string { +func (dbMan *dbManager) buildDeleteSql(tableName string, row common.Row, pkeys []string) string { var wherePlaceholders []string i := 1 @@ -210,14 +211,13 @@ } -func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool { - pkeys, err := getPkeysForTable(tableName) +func (dbMan *dbManager) update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) error { + pkeys, err := dbMan.getPkeysForTable(tableName) if len(pkeys) == 0 || err != nil { - log.Errorf("UPDATE No primary keys found for table.", tableName) - return false + return fmt.Errorf("UPDATE No primary keys found for table: %v, %v", tableName, err) } if len(oldRows) == 0 || len(newRows) == 0 { - return false + return fmt.Errorf("UPDATE No old or new rows, table: %v, %v, %v", tableName, oldRows, newRows) } var orderedColumns []string @@ -229,11 +229,10 @@ sort.Strings(orderedColumns) //build update statement, use arbitrary row as template - sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys) + sql := dbMan.buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys) prep, err := txn.Prepare(sql) if err != nil { - log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err) - return false + return fmt.Errorf("UPDATE Fail to prep statement %s error=%v", sql, err) } defer prep.Close() @@ -265,25 +264,23 @@ res, err := txn.Stmt(prep).Exec(values...) if err != nil { - log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err) - return false + return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err) } numRowsAffected, err := res.RowsAffected() if err != nil { - log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err) - return false + return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err) } //delete this once we figure out why tests are failing/not updating log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected) - log.Debugf("UPDATE Success [%s] values=%v", sql, values) + log.Debugf("UPDATE Success %s values=%v", sql, values) } - return true + return nil } -func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string { +func (dbMan *dbManager) buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string { if row == nil { return "" } @@ -311,7 +308,7 @@ } //precondition: rows.length > 1000, max number of entities for sqlite -func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string { +func (dbMan *dbManager) buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string { if len(rows) == 0 { return "" } @@ -343,13 +340,13 @@ return sql } -func getPkeysForTable(tableName string) ([]string, error) { - db := getDB() +func (dbMan *dbManager) getPkeysForTable(tableName string) ([]string, error) { + db := dbMan.getDB() normalizedTableName := normalizeTableName(tableName) sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;" rows, err := db.Query(sql, normalizedTableName) if err != nil { - log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err) + log.Errorf("Failed %s values=%s Error: %v", sql, normalizedTableName, err) return nil, err } var columnNames []string @@ -358,13 +355,15 @@ var value string err := rows.Scan(&value) if err != nil { - log.Fatal(err) + log.Errorf("failed to scan column names: %v", err) + return nil, err } columnNames = append(columnNames, value) } err = rows.Err() if err != nil { - log.Fatal(err) + log.Errorf("failed to scan column names: %v", err) + return nil, err } return columnNames, nil } @@ -377,13 +376,11 @@ * For the given apidConfigId, this function will retrieve all the distinch scopes * associated with it. Distinct, because scope is already a collection of the tenants. */ -func findScopesForId(configId string) (scopes []string) { +func (dbMan *dbManager) findScopesForId(configId string) (scopes []string, err error) { log.Debugf("findScopesForId: %s", configId) - var scope sql.NullString - db := getDB() - + db := dbMan.getDB() query := ` SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1 UNION @@ -391,7 +388,6 @@ UNION SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3 ` - rows, err := db.Query(query, configId, configId, configId) if err != nil { log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err) @@ -416,9 +412,9 @@ /* * Retrieve SnapshotInfo for the given apidConfigId from apid_config table */ -func getLastSequence() (lastSequence string) { +func (dbMan *dbManager) getLastSequence() (lastSequence string) { - err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) + err := dbMan.getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) if err != nil && err != sql.ErrNoRows { log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err) return @@ -432,11 +428,11 @@ * Persist the last change Id each time a change has been successfully * processed by the plugin(s) */ -func updateLastSequence(lastSequence string) error { +func (dbMan *dbManager) updateLastSequence(lastSequence string) error { log.Debugf("updateLastSequence: %s", lastSequence) - tx, err := getDB().Begin() + tx, err := dbMan.getDB().Begin() if err != nil { log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err @@ -454,10 +450,9 @@ return err } -func getApidInstanceInfo() (info apidInstanceInfo, err error) { +func (dbMan *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) { info.InstanceName = config.GetString(configName) info.ClusterID = config.GetString(configApidClusterId) - var savedClusterId string // always use default database for this @@ -481,7 +476,7 @@ } else { // first start - no row, generate a UUID and store it err = nil - newInstanceID = true + info.IsNewInstance = true info.InstanceID = util.GenerateUUID() log.Debugf("Inserting new apid instance id %s", info.InstanceID) @@ -489,13 +484,14 @@ info.InstanceID, info.ClusterID, "") } } else if savedClusterId != info.ClusterID { - log.Debug("Detected apid cluster id change in config. Apid will start clean") + log.Warnf("Detected apid cluster id change in config. %v v.s. %v Apid will start clean.", + savedClusterId, info.ClusterID) err = nil - newInstanceID = true + info.IsNewInstance = true info.InstanceID = util.GenerateUUID() - _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", - info.InstanceID, info.ClusterID, "") + _, err = tx.Exec("DELETE FROM APID;") + info.LastSnapshot = "" } if err = tx.Commit(); err != nil { @@ -504,8 +500,8 @@ return } -func updateApidInstanceInfo() error { - +func (dbMan *dbManager) updateApidInstanceInfo(instanceId, clusterId, lastSnap string) error { + log.Debugf("updateApidInstanceInfo: %v, %v, %v", instanceId, clusterId, lastSnap) // always use default database for this db, err := dataService.DB() if err != nil { @@ -521,7 +517,7 @@ REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)`, - apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) + instanceId, clusterId, lastSnap) if err != nil { log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err) return err @@ -536,3 +532,131 @@ return err } + +func (dbMan *dbManager) extractTables() (map[string]bool, error) { + tables := make(map[string]bool) + db := dbMan.getDB() + rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var table sql.NullString + if err := rows.Scan(&table); err != nil { + return nil, err + } + log.Debugf("Table %v found in existing db", table) + if table.Valid { + tables[table.String] = true + } + } + log.Debugf("Extracting table names from existing DB %v", tables) + return tables, nil +} + +func (dbMan *dbManager) getKnowTables() map[string]bool { + return dbMan.knownTables +} + +func (dbMan *dbManager) processChangeList(changes *common.ChangeList) error { + + tx, err := dbMan.getDB().Begin() + if err != nil { + return err + } + defer tx.Rollback() + + log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) + + for _, change := range changes.Changes { + if change.Table == LISTENER_TABLE_APID_CLUSTER { + return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table) + } + switch change.Operation { + case common.Insert: + err = dbMan.insert(change.Table, []common.Row{change.NewRow}, tx) + case common.Update: + if change.Table == LISTENER_TABLE_DATA_SCOPE { + return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table) + } + err = dbMan.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx) + case common.Delete: + err = dbMan.delete(change.Table, []common.Row{change.OldRow}, tx) + } + if err != nil { + return err + } + + } + + if err = tx.Commit(); err != nil { + return fmt.Errorf("Commit error in processChangeList: %v", err) + } + return nil +} + +func (dbMan *dbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error { + + var prevDb string + if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo { + log.Debugf("Release snapshot for {%s}. Switching to version {%s}", + apidInfo.LastSnapshot, snapshot.SnapshotInfo) + prevDb = apidInfo.LastSnapshot + } else { + log.Debugf("Process snapshot for version {%s}", + snapshot.SnapshotInfo) + } + db, err := dataService.DBVersion(snapshot.SnapshotInfo) + if err != nil { + return fmt.Errorf("unable to access database: %v", err) + } + + var numApidClusters int + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("unable to open DB txn: {%v}", err.Error()) + } + defer tx.Rollback() + err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) + if err != nil { + return fmt.Errorf("unable to read database: {%s}", err.Error()) + } + + if numApidClusters != 1 { + return fmt.Errorf("illegal state for apid_cluster, must be a single row") + } + + _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") + if err != nil && err.Error() != "duplicate column name: last_sequence" { + return fmt.Errorf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) + } + + if err = tx.Commit(); err != nil { + return fmt.Errorf("error when commit in processSqliteSnapshot: %v", err) + } + + //update apid instance info + apidInfo.LastSnapshot = snapshot.SnapshotInfo + err = dbMan.updateApidInstanceInfo(apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) + if err != nil { + log.Errorf("Unable to update instance info: %v", err) + return fmt.Errorf("unable to update instance info: %v", err) + } + + dbMan.setDB(db) + if isDataSnapshot { + dbMan.knownTables, err = dbMan.extractTables() + if err != nil { + return fmt.Errorf("unable to extract tables: %v", err) + } + } + log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo) + + // Releases the DB, when the Connection reference count reaches 0. + if prevDb != "" { + dataService.ReleaseDB(prevDb) + } + return nil +}
diff --git a/data_test.go b/data_test.go index 691e887..abad465 100644 --- a/data_test.go +++ b/data_test.go
@@ -15,32 +15,54 @@ package apidApigeeSync import ( + "database/sql" "github.com/apid/apid-core" - "github.com/apid/apid-core/data" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "io/ioutil" "sort" "strconv" ) var _ = Describe("data access tests", func() { - testCount := 1 - - var _ = BeforeEach(func() { + testCount := 0 + var testDbMan *dbManager + var dbVersion string + BeforeEach(func() { + var testDir string + testDir, err := ioutil.TempDir(tmpDir, "data_test") + config.Set(configLocalStoragePath, testDir) + Expect(err).NotTo(HaveOccurred()) + testDbMan = creatDbManager() testCount++ - db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount)) + dbVersion = "data_test_" + strconv.Itoa(testCount) + db, err := dataService.DBVersion(dbVersion) Expect(err).Should(Succeed()) - initDB(db) - createBootstrapTables(db) - setDB(db) + testDbMan.setDB(db) }) - var _ = AfterEach(func() { - data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount))) + AfterEach(func() { + config.Set(configLocalStoragePath, tmpDir) }) - Context("Update processing", func() { + It("check scope changes", func() { + newScopes := []string{"foo"} + scopes := []string{"bar"} + Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) + newScopes = []string{"foo", "bar"} + scopes = []string{"bar"} + Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) + newScopes = []string{"foo"} + scopes = []string{"bar", "foo"} + Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) + newScopes = []string{"foo", "bar"} + scopes = []string{"bar", "foo"} + Expect(scopeChanged(newScopes, scopes)).To(BeNil()) + + }) + + Context("build Sql", func() { It("unit test buildUpdateSql with single primary key", func() { testRow := common.Row{ "id": { @@ -66,7 +88,7 @@ } sort.Strings(orderedColumns) - result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"}) + result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"}) Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" + " WHERE id=$6").To(Equal(result)) }) @@ -99,403 +121,11 @@ } sort.Strings(orderedColumns) - result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"}) + result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"}) Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" + " WHERE id1=$7 AND id2=$8").To(Equal(result)) }) - It("test update with composite primary key", func() { - event := &common.ChangeList{} - - //this needs to match what is actually in the DB - oldRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "A product for testing Greg", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - newRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "new description", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: oldRow, - Operation: 1, - }, - } - //insert and assert success - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //create update event - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - //do the update - Expect(true).To(Equal(processChangeList(event))) - err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - }) - - It("update should succeed if newrow modifies the primary key", func() { - event := &common.ChangeList{} - - //this needs to match what is actually in the DB - oldRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "A product for testing Greg", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - newRow := common.Row{ - "id": { - Value: "new_id", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "new description", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: oldRow, - Operation: 1, - }, - } - //insert and assert success - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //create update event - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - //do the update - Expect(true).To(Equal(processChangeList(event))) - err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - }) - - It("update should succeed if newrow contains fewer fields than oldrow", func() { - event := &common.ChangeList{} - - oldRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "A product for testing Greg", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - newRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "new description", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: oldRow, - Operation: 1, - }, - } - //insert and assert success - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //create update event - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - //do the update - Expect(true).To(Equal(processChangeList(event))) - err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - }) - - It("update should succeed if oldrow contains fewer fields than newrow", func() { - event := &common.ChangeList{} - - oldRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "A product for testing Greg", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - newRow := common.Row{ - "id": { - Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", - }, - "api_resources": { - Value: "{/**}", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "43aef41d", - }, - "description": { - Value: "new description", - }, - "created_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "updated_at": { - Value: "2017-03-01 22:50:41.75+00:00", - }, - "_change_selector": { - Value: "43aef41d", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: oldRow, - Operation: 1, - }, - } - //insert and assert success - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //create update event - event.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: oldRow, - NewRow: newRow, - Operation: 2, - }, - } - - //do the update - Expect(true).To(Equal(processChangeList(event))) - err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - }) - }) - - Context("Insert processing", func() { It("Properly constructs insert sql for one row", func() { newRow := common.Row{ "id": { @@ -531,7 +161,7 @@ sort.Strings(orderedColumns) expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)" - Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow}))) + Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow}))) }) It("Properly constructs insert sql for multiple rows", func() { @@ -595,275 +225,11 @@ sort.Strings(orderedColumns) expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)" - Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2}))) + Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2}))) }) - It("Properly executes insert for a single rows", func() { - event := &common.ChangeList{} - - newRow1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: newRow1, - Operation: 1, - }, - } - - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - }) - - It("Properly executed insert for multiple rows", func() { - event := &common.ChangeList{} - - newRow1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - newRow2 := common.Row{ - "id": { - Value: "b", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: newRow1, - Operation: 1, - }, - { - Table: "kms.api_product", - NewRow: newRow2, - Operation: 1, - }, - } - - Expect(true).To(Equal(processChangeList(event))) - var nRows int - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(2)) - }) - - It("Fails to execute if row does not match existing table schema", func() { - event := &common.ChangeList{} - - newRow1 := common.Row{ - "not_and_id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: newRow1, - Operation: 1, - }, - } - - ok := processChangeList(event) - Expect(false).To(Equal(ok)) - - var nRows int - //assert that no extraneous rows were added - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(0)) - }) - - It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() { - event := &common.ChangeList{} - newRow1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - newRow2 := common.Row{ - "not_and_id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - event.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: newRow1, - Operation: 1, - }, - { - Table: "kms.api_product", - NewRow: newRow2, - Operation: 1, - }, - } - - ok := processChangeList(event) - Expect(false).To(Equal(ok)) - }) - }) - - Context("Delete processing", func() { It("Properly constructs sql prepare for Delete", func() { + createBootstrapTables(testDbMan.getDB()) row := common.Row{ "id": { Value: "new_id", @@ -891,324 +257,1233 @@ }, } - pkeys, err := getPkeysForTable("kms_api_product") + pkeys, err := testDbMan.getPkeysForTable("kms_api_product") Expect(err).Should(Succeed()) - sql := buildDeleteSql("kms_api_product", row, pkeys) + sql := testDbMan.buildDeleteSql("kms_api_product", row, pkeys) Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4")) }) + }) - It("Verify execute single insert & single delete works", func() { - event1 := &common.ChangeList{} - event2 := &common.ChangeList{} - - Row1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } - - event1.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: Row1, - Operation: 1, - }, - } - event2.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: Row1, - Operation: 3, - }, - } - - Expect(true).To(Equal(processChangeList(event1))) - var nRows int - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) - - Expect(true).To(Equal(processChangeList(event2))) - - // validate delete - err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(0)) - - // delete again should fail - coz entry will not exist - Expect(false).To(Equal(processChangeList(event2))) + Context("Process Changelist", func() { + BeforeEach(func() { + createBootstrapTables(testDbMan.getDB()) }) - It("verify multiple insert and single delete works", func() { - event1 := &common.ChangeList{} - event2 := &common.ChangeList{} + Context("Update processing", func() { + It("test update with composite primary key", func() { + event := &common.ChangeList{} - Row1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } + //this needs to match what is actually in the DB + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - Row2 := common.Row{ - "id": { - Value: "b", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - event1.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: Row1, - Operation: 1, - }, - { - Table: "kms.api_product", - NewRow: Row2, - Operation: 1, - }, - } - event2.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: Row1, - Operation: 3, - }, - } + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) - Expect(true).To(Equal(processChangeList(event1))) - var nRows int - //verify first row - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } - //verify second row - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + //do the update + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(2)) + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) - Expect(true).To(Equal(processChangeList(event2))) + }) - //verify second row still exists - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + It("update should succeed if newrow modifies the primary key", func() { + event := &common.ChangeList{} - // validate delete - err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + //this needs to match what is actually in the DB + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - // delete again should fail - coz entry will not exist - Expect(false).To(Equal(processChangeList(event2))) - }, 3) + newRow := common.Row{ + "id": { + Value: "new_id", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - It("verify single insert and multiple delete fails", func() { - event1 := &common.ChangeList{} - event2 := &common.ChangeList{} + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) - Row1 := common.Row{ - "id": { - Value: "a", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } - Row2 := common.Row{ - "id": { - Value: "b", - }, - "api_resources": { - Value: "r", - }, - "environments": { - Value: "{test}", - }, - "tenant_id": { - Value: "t", - }, - "description": { - Value: "d", - }, - "created_at": { - Value: "c", - }, - "updated_at": { - Value: "u", - }, - "_change_selector": { - Value: "cs", - }, - } + //do the update + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) - event1.Changes = []common.Change{ - { - Table: "kms.api_product", - NewRow: Row1, - Operation: 1, - }, - } - event2.Changes = []common.Change{ - { - Table: "kms.api_product", - OldRow: Row1, - Operation: 3, - }, - { - Table: "kms.api_product", - OldRow: Row2, - Operation: 3, - }, - } + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) - Expect(true).To(Equal(processChangeList(event1))) - var nRows int - //verify insert - err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + - "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + - "and _change_selector='cs'").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + It("update should succeed if newrow contains fewer fields than oldrow", func() { + event := &common.ChangeList{} - //assert that no extraneous rows were added - err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(1)) + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - Expect(false).To(Equal(processChangeList(event2))) + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } - }, 3) + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) + + It("update should succeed if oldrow contains fewer fields than newrow", func() { + event := &common.ChangeList{} + + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) + }) + + Context("Insert processing", func() { + It("Properly executes insert for a single rows", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + } + + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + }) + + It("Properly executed insert for multiple rows", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + newRow2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: newRow2, + Operation: 1, + }, + } + + Expect(testDbMan.processChangeList(event)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(2)) + }) + + It("Fails to execute if row does not match existing table schema", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "not_and_id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + } + + Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed()) + + var nRows int + //assert that no extraneous rows were added + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(0)) + }) + + It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() { + event := &common.ChangeList{} + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + newRow2 := common.Row{ + "not_and_id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: newRow2, + Operation: 1, + }, + } + + Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed()) + }) + }) + + Context("Delete processing", func() { + + It("Verify execute single insert & single delete works", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + } + + Expect(testDbMan.processChangeList(event1)).Should(Succeed()) + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + Expect(testDbMan.processChangeList(event2)).Should(Succeed()) + + // validate delete + err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(0)) + + // delete again should fail - coz entry will not exist + Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed()) + }) + + It("verify multiple insert and single delete works", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + Row2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: Row2, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + } + + Expect(testDbMan.processChangeList(event1)).Should(Succeed()) + var nRows int + //verify first row + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //verify second row + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(2)) + + Expect(testDbMan.processChangeList(event2)).Should(Succeed()) + + //verify second row still exists + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + // validate delete + err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + // delete again should fail - coz entry will not exist + Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed()) + }, 3) + + It("verify single insert and multiple delete fails", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + Row2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + { + Table: "kms.api_product", + OldRow: Row2, + Operation: 3, + }, + } + + Expect(testDbMan.processChangeList(event1)).Should(Succeed()) + var nRows int + //verify insert + err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed()) + + }, 3) + }) + + Context("ApigeeSync change event", func() { + + Context(LISTENER_TABLE_APID_CLUSTER, func() { + + It("should not change LISTENER_TABLE_APID_CLUSTER", func() { + event := common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Insert, + Table: LISTENER_TABLE_APID_CLUSTER, + }, + }, + } + Expect(testDbMan.processChangeList(&event)).NotTo(Succeed()) + + event = common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Update, + Table: LISTENER_TABLE_APID_CLUSTER, + }, + }, + } + + Expect(testDbMan.processChangeList(&event)).NotTo(Succeed()) + }) + + }) + + Context("data scopes", func() { + + It("insert event should add", func() { + event := common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Insert, + Table: LISTENER_TABLE_DATA_SCOPE, + NewRow: common.Row{ + "id": &common.ColumnVal{Value: "i"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s1"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, + }, + }, + { + Operation: common.Insert, + Table: LISTENER_TABLE_DATA_SCOPE, + NewRow: common.Row{ + "id": &common.ColumnVal{Value: "j"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s2"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, + }, + }, + }, + } + + testDbMan.processChangeList(&event) + + count := 0 + id := sql.NullString{} + rows, err := testDbMan.getDB().Query(` + SELECT scope FROM EDGEX_DATA_SCOPE`) + Expect(err).NotTo(HaveOccurred()) + defer rows.Close() + for rows.Next() { + count++ + Expect(rows.Scan(&id)).Should(Succeed()) + Expect(id.String).Should(Equal("s" + strconv.Itoa(count))) + } + + Expect(count).To(Equal(2)) + }) + + It("delete event should delete", func() { + event := common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Insert, + Table: LISTENER_TABLE_DATA_SCOPE, + NewRow: common.Row{ + "id": &common.ColumnVal{Value: "i"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, + }, + }, + }, + } + + testDbMan.processChangeList(&event) + + event = common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Delete, + Table: LISTENER_TABLE_DATA_SCOPE, + OldRow: event.Changes[0].NewRow, + }, + }, + } + + testDbMan.processChangeList(&event) + + var nRows int + err := testDbMan.getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(BeZero()) + }) + + It("update event should panic for data scopes table", func() { + event := common.ChangeList{ + LastSequence: "test", + Changes: []common.Change{ + { + Operation: common.Update, + Table: LISTENER_TABLE_DATA_SCOPE, + }, + }, + } + + Expect(testDbMan.processChangeList(&event)).ToNot(Succeed()) + }) + + }) + }) + }) + + Context("Process Snapshot", func() { + initTestDb := func(sqlFile string, dbMan *dbManager) common.Snapshot { + stmts, err := ioutil.ReadFile(sqlFile) + Expect(err).Should(Succeed()) + Expect(testDbMan.getDB().Exec(string(stmts))).ShouldNot(BeNil()) + Expect(testDbMan.initDB()).Should(Succeed()) + return common.Snapshot{ + SnapshotInfo: dbVersion, + } + } + + AfterEach(func() { + + }) + + It("should fail if more than one apid_cluster rows", func() { + event := initTestDb("./sql/init_listener_test_duplicate_apids.sql", testDbMan) + Expect(testDbMan.processSnapshot(&event, true)).ToNot(Succeed()) + }) + + It("should process a valid Snapshot", func() { + config.Set(configApidClusterId, "a") + apidInfo.ClusterID = "a" + event := initTestDb("./sql/init_listener_test_valid_snapshot.sql", testDbMan) + Expect(testDbMan.processSnapshot(&event, true)).Should(Succeed()) + + info, err := testDbMan.getApidInstanceInfo() + Expect(err).Should(Succeed()) + Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo)) + Expect(info.IsNewInstance).To(BeFalse()) + Expect(dataService.DBVersion(event.SnapshotInfo)).Should(Equal(testDbMan.getDB())) + + // apid Cluster + id := &sql.NullString{} + Expect(testDbMan.getDB().QueryRow(`SELECT id FROM EDGEX_APID_CLUSTER`). + Scan(id)).Should(Succeed()) + Expect(id.Valid).Should(BeTrue()) + Expect(id.String).To(Equal("i")) + + // Data Scope + env := &sql.NullString{} + count := 0 + rows, err := testDbMan.getDB().Query(`SELECT env FROM EDGEX_DATA_SCOPE`) + Expect(err).Should(Succeed()) + defer rows.Close() + for rows.Next() { + count++ + rows.Scan(&env) + Expect(env.Valid).Should(BeTrue()) + Expect(env.String).To(Equal("e" + strconv.Itoa(count))) + } + Expect(count).To(Equal(3)) + + //find scopes for Id + scopes, err := testDbMan.findScopesForId("a") + Expect(err).Should(Succeed()) + Expect(len(scopes)).To(Equal(6)) + expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"} + sort.Strings(scopes) + sort.Strings(expectedScopes) + Expect(scopes).Should(Equal(expectedScopes)) + }) + + It("should detect clusterid change", func() { + Expect(testDbMan.initDB()).Should(Succeed()) + testDbMan.updateApidInstanceInfo("a", "b", "c") + config.Set(configApidClusterId, "d") + + info, err := testDbMan.getApidInstanceInfo() + Expect(err).Should(Succeed()) + Expect(info.LastSnapshot).To(BeZero()) + Expect(info.IsNewInstance).To(BeTrue()) + }) + }) + }) func createBootstrapTables(db apid.DB) { tx, err := db.Begin() Expect(err).To(Succeed()) //all tests in this file operate on the api_product table. Create the necessary tables for this here - tx.Exec("CREATE TABLE _transicator_tables " + - "(tableName varchar not null, columnName varchar not null, " + - "typid integer, primaryKey bool);") - tx.Exec("DELETE from _transicator_tables") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)") - tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)") - - tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " + - "api_resources text,approval_type text,scopes text,proxies text, environments text," + - "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " + - "primary key (id,tenant_id,created_at,updated_at));") - tx.Exec("DELETE from kms_api_product") - err = tx.Commit() + _, err = tx.Exec(`CREATE TABLE _transicator_tables + (tableName varchar not null, columnName varchar not null, typid integer, primaryKey bool); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0); + INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0); + CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at)); + `) Expect(err).To(Succeed()) + _, err = tx.Exec(` + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org_scope',1043,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env_scope',1043,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); + INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); + CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,org_scope text, + env_scope text,created blob,created_by text,updated blob,updated_by text,_change_selector text, + primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector)); + `) + Expect(err).To(Succeed()) + + Expect(tx.Commit()).To(Succeed()) }
diff --git a/dockertests/apid_config.yaml b/dockertests/apid_config.yaml new file mode 100644 index 0000000..e4cd751 --- /dev/null +++ b/dockertests/apid_config.yaml
@@ -0,0 +1,9 @@ +apigeesync_instance_name: SQLLITAPID +apigeesync_snapshot_server_base: http://localhost:9001/ +apigeesync_change_server_base: http://localhost:9000/ +apigeesync_snapshot_proto: sqlite +log_level: Debug +apigeesync_consumer_key: 33f39JNLosF1mDOXJoCfbauchVzPrGrl +apigeesync_consumer_secret: LAolGShAx6H3vfNF +apigeesync_cluster_id: 4c6bb536-0d64-43ca-abae-17c08f1a7e58 +local_storage_path: /Users/haoming/go/src/github.com/apid/apidApigeeSync/tmp/sqlite
diff --git a/dockertests/dockerSetup.sh b/dockertests/dockerSetup.sh index 8df2b5d..6ded35d 100755 --- a/dockertests/dockerSetup.sh +++ b/dockertests/dockerSetup.sh
@@ -25,7 +25,6 @@ TEST_PG_BASE=postgres://postgres:changeme@$DOCKER_IP:5432 TEST_PG_URL=postgres://postgres:changeme@$DOCKER_IP:5432/edgex echo ${TEST_PG_URL} - export APIGEE_SYNC_DOCKER_PG_URL=${TEST_PG_URL} export APIGEE_SYNC_DOCKER_IP=${DOCKER_IP} @@ -47,8 +46,13 @@ ssname=apidSync_test_ss csname=apidSync_test_cs +# setup docker network +docker network rm apidApigeeSync-docker-test || true +docker network create apidApigeeSync-docker-test +DOCKER_PG_URL=postgres://postgres:changeme@$pgname:5432/edgex +echo $DOCKER_PG_URL # run PG -docker run --name ${pgname} -p 5432:5432 -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres +docker run --name ${pgname} -p 5432:5432 --network=apidApigeeSync-docker-test -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres # Wait for PG to be up -- it takes a few seconds while `true` @@ -67,8 +71,8 @@ psql -f ${WORK_DIR}/dockertests/user-setup.sql ${TEST_PG_URL} # run SS and CS -docker run --name ${ssname} -d -p 9001:9001 apigeelabs/transicator-snapshot -p 9001 -u ${TEST_PG_URL} -docker run --name ${csname} -d -p 9000:9000 apigeelabs/transicator-changeserver -p 9000 -u ${TEST_PG_URL} -s testslot +docker run --name ${ssname} -d -p 9001:9001 --network=apidApigeeSync-docker-test apigeelabs/transicator-snapshot -p 9001 -u ${DOCKER_PG_URL} +docker run --name ${csname} -d -p 9000:9000 --network=apidApigeeSync-docker-test apigeelabs/transicator-changeserver -p 9000 -u ${DOCKER_PG_URL} -s testslot # Wait for SS to be up while `true`
diff --git a/dockertests/docker_test.go b/dockertests/docker_test.go index e8fc25d..631e17a 100644 --- a/dockertests/docker_test.go +++ b/dockertests/docker_test.go
@@ -18,6 +18,7 @@ "encoding/json" "github.com/apid/apid-core" "github.com/apid/apid-core/factory" + "github.com/apid/apidApigeeSync" _ "github.com/apid/apidApigeeSync" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -77,7 +78,7 @@ config.Set(configProxyServerBaseURI, testServer.URL) // init plugin - apid.RegisterPlugin(initPlugin) + apid.RegisterPlugin(initPlugin, apidApigeeSync.PluginData) apid.InitializePlugins("dockerTest") <-initDone @@ -263,6 +264,8 @@ updated: t, updatedBy: testInitUser, changeSelector: clusterId, + orgScope: "org1", + envScope: "env1", } bf := bundleConfigData{ @@ -335,7 +338,7 @@ type newTableHandler struct { targetTablename string done Done - verifyFunc func (string, apid.DB) + verifyFunc func(string, apid.DB) } func (n *newTableHandler) Handle(event apid.Event) {
diff --git a/dockertests/management_pg.go b/dockertests/management_pg.go index bbe28e9..77bdf67 100644 --- a/dockertests/management_pg.go +++ b/dockertests/management_pg.go
@@ -90,9 +90,11 @@ created_by, updated, updated_by, - _change_selector + _change_selector, + org_scope, + env_scope ) - VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)`) + VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`) if err != nil { return err } @@ -108,6 +110,8 @@ ds.updated, ds.updatedBy, ds.changeSelector, + ds.orgScope, + ds.envScope, ) return err
diff --git a/dockertests/master-schema.sql b/dockertests/master-schema.sql index 756ff62..00f60d0 100644 --- a/dockertests/master-schema.sql +++ b/dockertests/master-schema.sql
@@ -45,6 +45,8 @@ updated timestamp without time zone, updated_by text, _change_selector text, + org_scope character varying(36) NOT NULL, + env_scope character varying(36) NOT NULL, CONSTRAINT data_scope_pkey PRIMARY KEY (id), CONSTRAINT data_scope_apid_cluster_id_fk FOREIGN KEY (apid_cluster_id) REFERENCES apid_cluster (id)
diff --git a/dockertests/pg_table_data.go b/dockertests/pg_table_data.go index 0a1aecd..854e587 100644 --- a/dockertests/pg_table_data.go +++ b/dockertests/pg_table_data.go
@@ -42,6 +42,8 @@ updated time.Time updatedBy string changeSelector string + orgScope string + envScope string } /* FOREIGN KEY (data_scope_id)
diff --git a/init.go b/init.go index b7f470e..f381ca4 100644 --- a/init.go +++ b/init.go
@@ -15,7 +15,6 @@ package apidApigeeSync import ( - "encoding/json" "fmt" "net/http" "os" @@ -39,7 +38,8 @@ // special value - set by ApigeeSync, not taken from configuration configApidInstanceID = "apigeesync_apid_instance_id" // This will not be needed once we have plugin handling tokens. - configBearerToken = "apigeesync_bearer_token" + configBearerToken = "apigeesync_bearer_token" + configLocalStoragePath = "local_storage_path" ) const ( @@ -48,17 +48,12 @@ var ( /* All set during plugin initialization */ - log apid.LogService - config apid.ConfigService - dataService apid.DataService - events apid.EventsService - apidInfo apidInstanceInfo - newInstanceID bool - apidTokenManager tokenManager - apidChangeManager changeManager - apidSnapshotManager snapShotManager - httpclient *http.Client - isOfflineMode bool + log apid.LogService + config apid.ConfigService + dataService apid.DataService + eventService apid.EventsService + apiService apid.APIService + apidInfo apidInstanceInfo /* Set during post plugin initialization * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called @@ -68,6 +63,7 @@ type apidInstanceInfo struct { InstanceID, InstanceName, ClusterID, LastSnapshot string + IsNewInstance bool } type pluginDetail struct { @@ -76,7 +72,7 @@ } func init() { - apid.RegisterPlugin(initPlugin, pluginData) + apid.RegisterPlugin(initPlugin, PluginData) } func initConfigDefaults() { @@ -93,59 +89,7 @@ log.Debugf("Using %s as display name", config.GetString(configName)) } -func initVariables() error { - - var tr *http.Transport - - tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL)) - tr.MaxIdleConnsPerHost = maxIdleConnsPerHost - - httpclient = &http.Client{ - Transport: tr, - Timeout: httpTimeout, - CheckRedirect: func(req *http.Request, _ []*http.Request) error { - req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken()) - return nil - }, - } - - // set up default database - db, err := dataService.DB() - if err != nil { - return fmt.Errorf("Unable to access DB: %v", err) - } - err = initDB(db) - if err != nil { - return fmt.Errorf("Unable to access DB: %v", err) - } - setDB(db) - - apidInfo, err = getApidInstanceInfo() - if err != nil { - return fmt.Errorf("Unable to get apid instance info: %v", err) - } - - if config.IsSet(configApidInstanceID) { - log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID) - } - config.Set(configApidInstanceID, apidInfo.InstanceID) - - return nil -} - -func createManagers() { - if isOfflineMode { - apidSnapshotManager = &offlineSnapshotManager{} - apidChangeManager = &offlineChangeManager{} - } else { - apidSnapshotManager = createSnapShotManager() - apidChangeManager = createChangeManager() - } - - apidTokenManager = createSimpleTokenManager() -} - -func checkForRequiredValues() error { +func checkForRequiredValues(isOfflineMode bool) error { required := []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret} if !isOfflineMode { required = append(required, configSnapServerBaseURI, configChangeServerBaseURI) @@ -153,12 +97,12 @@ // check for required values for _, key := range required { if !config.IsSet(key) { - return fmt.Errorf("Missing required config value: %s", key) + return fmt.Errorf("missing required config value: %s", key) } } proto := config.GetString(configSnapshotProtocol) if proto != "sqlite" { - return fmt.Errorf("Illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol) + return fmt.Errorf("illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol) } return nil @@ -168,90 +112,98 @@ log = logger } -/* initialization */ -func _initPlugin(services apid.Services) error { - log.Debug("start init") +func initManagers(isOfflineMode bool) (*listenerManager, *ApiManager, error) { + // check for forward proxy + var tr *http.Transport + tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL)) + tr.MaxIdleConnsPerHost = maxIdleConnsPerHost - config = services.Config() - initConfigDefaults() - - if config.GetBool(configDiagnosticMode) { - log.Warn("Diagnostic mode: will not download changelist and snapshots!") - isOfflineMode = true - } - - err := checkForRequiredValues() + apidDbManager := creatDbManager() + db, err := dataService.DB() if err != nil { - return err + return nil, nil, fmt.Errorf("unable to access DB: %v", err) } - - err = initVariables() + apidDbManager.setDB(db) + err = apidDbManager.initDB() if err != nil { - return err + return nil, nil, fmt.Errorf("unable to access DB: %v", err) } - return nil + apidInfo, err = apidDbManager.getApidInstanceInfo() + if err != nil { + return nil, nil, fmt.Errorf("unable to get apid instance info: %v", err) + } + + if config.IsSet(configApidInstanceID) { + log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID) + } + config.Set(configApidInstanceID, apidInfo.InstanceID) + + apidTokenManager := createApidTokenManager(apidInfo.IsNewInstance) + var snapMan snapshotManager + var apidChangeManager changeManager + + if isOfflineMode { + snapMan = &offlineSnapshotManager{ + dbMan: apidDbManager, + } + apidChangeManager = &offlineChangeManager{} + } else { + httpClient := &http.Client{ + Transport: tr, + Timeout: httpTimeout, + CheckRedirect: func(req *http.Request, _ []*http.Request) error { + req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken()) + return nil + }, + } + snapMan = createSnapShotManager(apidDbManager, apidTokenManager, httpClient) + apidChangeManager = createChangeManager(apidDbManager, snapMan, apidTokenManager, httpClient) + } + + listenerMan := &listenerManager{ + changeMan: apidChangeManager, + snapMan: snapMan, + tokenMan: apidTokenManager, + isOfflineMode: isOfflineMode, + } + + apiMan := &ApiManager{ + endpoint: tokenEndpoint, + tokenMan: apidTokenManager, + } + return listenerMan, apiMan, nil } func initPlugin(services apid.Services) (apid.PluginData, error) { SetLogger(services.Log().ForModule("apigeeSync")) dataService = services.Data() - events = services.Events() + eventService = services.Events() + apiService = services.API() + log.Debug("start init") + config = services.Config() + initConfigDefaults() - err := _initPlugin(services) + isOfflineMode := false + if config.GetBool(configDiagnosticMode) { + log.Warn("Diagnostic mode: will not download changelist and snapshots!") + isOfflineMode = true + } + + err := checkForRequiredValues(isOfflineMode) if err != nil { - return pluginData, err + return PluginData, err } + if err != nil { + return PluginData, err + } + listenerMan, apiMan, err := initManagers(isOfflineMode) + if err != nil { + return PluginData, err + } + listenerMan.init() + apiMan.InitAPI(apiService) - createManagers() - - /* This callback function will get called once all the plugins are - * initialized (not just this plugin). This is needed because, - * downloadSnapshots/changes etc have to begin to be processed only - * after all the plugins are initialized - */ - events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) - - InitAPI(services) log.Debug("end init") - - return pluginData, nil -} - -// Plugins have all initialized, gather their info and start the ApigeeSync downloads -func postInitPlugins(event apid.Event) { - var plinfoDetails []pluginDetail - if pie, ok := event.(apid.PluginsInitializedEvent); ok { - /* - * Store the plugin details in the heap. Needed during - * Bearer token generation request. - */ - for _, plugin := range pie.Plugins { - name := plugin.Name - version := plugin.Version - if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok { - inf := pluginDetail{ - Name: name, - SchemaVersion: schemaVersion} - plinfoDetails = append(plinfoDetails, inf) - log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion) - } - } - if plinfoDetails == nil { - log.Panicf("No Plugins registered!") - } - - pgInfo, err := json.Marshal(plinfoDetails) - if err != nil { - log.Panicf("Unable to marshal plugin data: %v", err) - } - apidPluginDetails = string(pgInfo[:]) - - log.Debug("start post plugin init") - - apidTokenManager.start() - go bootstrap() - - log.Debug("Done post plugin init") - } + return PluginData, nil }
diff --git a/init_test.go b/init_test.go index 03e5450..bdd9fc7 100644 --- a/init_test.go +++ b/init_test.go
@@ -18,36 +18,78 @@ "github.com/apid/apid-core" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "net/http" + "strconv" ) var _ = Describe("init", func() { - var _ = BeforeEach(func() { - _initPlugin(apid.AllServices()) + testCount := 0 + BeforeEach(func() { + testCount++ }) Context("Apid Instance display name", func() { + AfterEach(func() { + apiService = apid.API() + }) - It("should be hostname by default", func() { - log.Info("Starting init tests...") + It("init should register listener", func() { + me := &mockEvent{ + listenerMap: make(map[apid.EventSelector]apid.EventHandlerFunc), + } + ma := &mockApi{ + handleMap: make(map[string]http.HandlerFunc), + } + ms := &mockService{ + config: apid.Config(), + log: apid.Log(), + api: ma, + data: apid.Data(), + events: me, + } + testname := "test_" + strconv.Itoa(testCount) + ms.config.Set(configName, testname) + pd, err := initPlugin(ms) + Expect(err).Should(Succeed()) + Expect(apidInfo.InstanceName).To(Equal(testname)) + Expect(me.listenerMap[apid.SystemEventsSelector]).ToNot(BeNil()) + Expect(ma.handleMap[tokenEndpoint]).ToNot(BeNil()) + Expect(pd).Should(Equal(PluginData)) + Expect(apidInfo.IsNewInstance).Should(BeTrue()) + }) - initConfigDefaults() - Expect(apidInfo.InstanceName).To(Equal("testhost")) - }, 3) + It("create managers for normal mode", func() { + listenerMan, apiMan, err := initManagers(false) + Expect(err).Should(Succeed()) + Expect(listenerMan).ToNot(BeNil()) + Expect(listenerMan.tokenMan).ToNot(BeNil()) + snapMan, ok := listenerMan.snapMan.(*apidSnapshotManager) + Expect(ok).Should(BeTrue()) + Expect(snapMan.tokenMan).ToNot(BeNil()) + Expect(snapMan.dbMan).ToNot(BeNil()) + changeMan, ok := listenerMan.changeMan.(*pollChangeManager) + Expect(ok).Should(BeTrue()) + Expect(changeMan.tokenMan).ToNot(BeNil()) + Expect(changeMan.dbMan).ToNot(BeNil()) + Expect(changeMan.snapMan).ToNot(BeNil()) + Expect(apiMan).ToNot(BeNil()) + Expect(apiMan.tokenMan).ToNot(BeNil()) + }) - It("accept display name from config", func() { - config.Set(configName, "aa01") - initConfigDefaults() - var apidInfoLatest apidInstanceInfo - apidInfoLatest, _ = getApidInstanceInfo() - Expect(apidInfoLatest.InstanceName).To(Equal("aa01")) - Expect(apidInfoLatest.LastSnapshot).To(Equal("")) - }, 3) + It("create managers for diagnostic mode", func() { + config.Set(configDiagnosticMode, true) + listenerMan, apiMan, err := initManagers(true) + Expect(err).Should(Succeed()) + Expect(listenerMan).ToNot(BeNil()) + Expect(listenerMan.tokenMan).ToNot(BeNil()) + snapMan, ok := listenerMan.snapMan.(*offlineSnapshotManager) + Expect(ok).Should(BeTrue()) + Expect(snapMan.dbMan).ToNot(BeNil()) + _, ok = listenerMan.changeMan.(*offlineChangeManager) + Expect(ok).Should(BeTrue()) + Expect(apiMan).ToNot(BeNil()) + Expect(apiMan.tokenMan).ToNot(BeNil()) + }) }) - - It("should put apigeesync_apid_instance_id value in config", func() { - instanceID := config.GetString(configApidInstanceID) - Expect(instanceID).NotTo(BeEmpty()) - Expect(instanceID).To(Equal(apidInfo.InstanceID)) - }) })
diff --git a/listener.go b/listener.go index 94befd6..7f46b34 100644 --- a/listener.go +++ b/listener.go
@@ -15,9 +15,8 @@ package apidApigeeSync import ( - "errors" + "encoding/json" "github.com/apid/apid-core" - "github.com/apigee-labs/transicator/common" ) const ( @@ -25,107 +24,86 @@ LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope" ) -func processSnapshot(snapshot *common.Snapshot) { +type listenerManager struct { + changeMan changeManager + snapMan snapshotManager + tokenMan tokenManager + isOfflineMode bool +} - var prevDb string - if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo { - log.Debugf("Release snapshot for {%s}. Switching to version {%s}", - apidInfo.LastSnapshot, snapshot.SnapshotInfo) - prevDb = apidInfo.LastSnapshot - } else { - log.Debugf("Process snapshot for version {%s}", - snapshot.SnapshotInfo) - } - db, err := dataService.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Unable to access database: %v", err) - } +func (l *listenerManager) init() { + /* This callback function will get called once all the plugins are + * initialized (not just this plugin). This is needed because, + * downloadSnapshots/changes etc have to begin to be processed only + * after all the plugins are initialized + */ + eventService.ListenOnceFunc(apid.SystemEventsSelector, l.postInitPlugins) +} - processSqliteSnapshot(db) +// Plugins have all initialized, gather their info and start the ApigeeSync downloads +func (l *listenerManager) postInitPlugins(event apid.Event) { + var plinfoDetails []pluginDetail + if pie, ok := event.(apid.PluginsInitializedEvent); ok { + /* + * Store the plugin details in the heap. Needed during + * Bearer token generation request. + */ + for _, plugin := range pie.Plugins { + name := plugin.Name + version := plugin.Version + if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok { + inf := pluginDetail{ + Name: name, + SchemaVersion: schemaVersion} + plinfoDetails = append(plinfoDetails, inf) + log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion) + } + } + if plinfoDetails == nil { + log.Panic("No Plugins registered!") + } - //update apid instance info - apidInfo.LastSnapshot = snapshot.SnapshotInfo - err = updateApidInstanceInfo() - if err != nil { - log.Panicf("Unable to update instance info: %v", err) - } + pgInfo, err := json.Marshal(plinfoDetails) + if err != nil { + log.Panicf("Unable to marshal plugin data: %v", err) + } + apidPluginDetails = string(pgInfo[:]) - setDB(db) - log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo) + log.Debug("start post plugin init") - // Releases the DB, when the Connection reference count reaches 0. - if prevDb != "" { - dataService.ReleaseDB(prevDb) + l.tokenMan.start() + go l.bootstrap(apidInfo.LastSnapshot) + + log.Debug("Done post plugin init") } } -func processSqliteSnapshot(db apid.DB) { - - var numApidClusters int - tx, err := db.Begin() - if err != nil { - log.Panicf("Unable to open DB txn: {%v}", err.Error()) - } - defer tx.Rollback() - err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) - if err != nil { - log.Panicf("Unable to read database: {%s}", err.Error()) +/* + * Start from existing snapshot if possible + * If an existing snapshot does not exist, use the apid scope to fetch + * all data scopes, then get a snapshot for those data scopes + * + * Then, poll for changes + */ +func (l *listenerManager) bootstrap(lastSnap string) { + if l.isOfflineMode && lastSnap == "" { + log.Panic("Diagnostic mode requires existent snapshot info in default DB.") } - if numApidClusters != 1 { - log.Panic("Illegal state for apid_cluster. Must be a single row.") - } - - _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") - if err != nil { - if err.Error() == "duplicate column name: last_sequence" { + if lastSnap != "" { + if err := l.snapMan.startOnDataSnapshot(lastSnap); err == nil { + log.Infof("Started on local snapshot: %s", lastSnap) + l.changeMan.pollChangeWithBackoff() return } else { - log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) - } - } - if err = tx.Commit(); err != nil { - log.Errorf("Error when commit in processSqliteSnapshot: %v", err) - } -} - -func processChangeList(changes *common.ChangeList) bool { - - ok := false - - tx, err := getDB().Begin() - if err != nil { - log.Panicf("Error processing ChangeList: %v", err) - } - defer tx.Rollback() - - log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) - - for _, change := range changes.Changes { - if change.Table == LISTENER_TABLE_APID_CLUSTER { - log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) - } - switch change.Operation { - case common.Insert: - ok = insert(change.Table, []common.Row{change.NewRow}, tx) - case common.Update: - if change.Table == LISTENER_TABLE_DATA_SCOPE { - log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) - } - ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx) - case common.Delete: - ok = _delete(change.Table, []common.Row{change.OldRow}, tx) - } - if !ok { - err = errors.New("Sql Operation error. Operation rollbacked") - log.Error("Sql Operation error. Operation rollbacked") - return false + log.Errorf("Failed to bootstrap on local snapshot: %v", err) + log.Warn("Will get new snapshots.") } } - if err = tx.Commit(); err != nil { - log.Errorf("Commit error in processChangeList: %v", err) - return false + l.snapMan.downloadBootSnapshot() + if err := l.snapMan.downloadDataSnapshot(); err != nil { + log.Panicf("Error downloading data snapshot: %v", err) } - return true + l.changeMan.pollChangeWithBackoff() }
diff --git a/listener_test.go b/listener_test.go index 54974eb..5e8f072 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -15,360 +15,79 @@ package apidApigeeSync import ( + "github.com/apid/apid-core" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - - "github.com/apid/apid-core" - "github.com/apigee-labs/transicator/common" - "os" - "reflect" - "sort" + "strconv" ) var _ = Describe("listener", func() { + testCount := 0 + var testListenerMan *listenerManager + var dummyChangeMan *dummyChangeManager + var dummySnapMan *dummySnapshotManager + var dummyTokenMan *dummyTokenManager - var createTestDb = func(sqlfile string, dbId string) common.Snapshot { - initDb(sqlfile, "./mockdb.sqlite3") - file, err := os.Open("./mockdb.sqlite3") - Expect(err).ShouldNot(HaveOccurred()) - - s := common.Snapshot{} - err = processSnapshotServerFileResponse(dbId, file, &s) - Expect(err).ShouldNot(HaveOccurred()) - return s - } - - var _ = BeforeEach(func() { - _initPlugin(apid.AllServices()) - }) - - var _ = AfterEach(func() { - if wipeDBAferTest { - db, err := dataService.DB() - Expect(err).Should(Succeed()) - tx, err := db.Begin() - Expect(err).Should(Succeed()) - _, err = tx.Exec("DELETE FROM APID") - Expect(err).Should(Succeed()) - err = tx.Commit() - Expect(err).Should(Succeed()) + BeforeEach(func() { + testCount++ + dummySnapMan = &dummySnapshotManager{ + downloadCalledChan: make(chan bool, 1), + startCalledChan: make(chan bool, 1), } - wipeDBAferTest = true + dummyTokenMan = &dummyTokenManager{ + invalidateChan: make(chan bool, 1), + } + dummyChangeMan = &dummyChangeManager{ + pollChangeWithBackoffChan: make(chan bool, 1), + } + testListenerMan = &listenerManager{ + changeMan: dummyChangeMan, + snapMan: dummySnapMan, + tokenMan: dummyTokenMan, + } }) - Context("ApigeeSync snapshot event", func() { + AfterEach(func() { - It("should fail if more than one apid_cluster rows", func() { - event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters") - Expect(func() { processSnapshot(&event) }).To(Panic()) - }, 3) - - It("should fail if more than one apid_cluster rows", func() { - newScopes := []string{"foo"} - scopes := []string{"bar"} - Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) - newScopes = []string{"foo", "bar"} - scopes = []string{"bar"} - Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) - newScopes = []string{"foo"} - scopes = []string{"bar", "foo"} - Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"})) - newScopes = []string{"foo", "bar"} - scopes = []string{"bar", "foo"} - Expect(scopeChanged(newScopes, scopes)).To(BeNil()) - - }, 3) - - It("should process a valid Snapshot", func() { - - event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid") - - processSnapshot(&event) - - info, err := getApidInstanceInfo() - Expect(err).NotTo(HaveOccurred()) - - Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo)) - - db := getDB() - - expectedDB, err := dataService.DBVersion(event.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - - Expect(db == expectedDB).Should(BeTrue()) - - // apid Cluster - var dcs []dataApidCluster - - rows, err := db.Query(` - SELECT id, name, description, umbrella_org_app_name, - created, created_by, updated, updated_by - FROM EDGEX_APID_CLUSTER`) - Expect(err).NotTo(HaveOccurred()) - defer rows.Close() - - c := dataApidCluster{} - for rows.Next() { - rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName, - &c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy) - dcs = append(dcs, c) - } - - Expect(len(dcs)).To(Equal(1)) - dc := dcs[0] - - Expect(dc.ID).To(Equal("i")) - Expect(dc.Name).To(Equal("n")) - Expect(dc.Description).To(Equal("d")) - Expect(dc.OrgAppName).To(Equal("o")) - Expect(dc.Created).To(Equal("c")) - Expect(dc.CreatedBy).To(Equal("c")) - Expect(dc.Updated).To(Equal("u")) - Expect(dc.UpdatedBy).To(Equal("u")) - - // Data Scope - var dds []dataDataScope - - rows, err = db.Query(` - SELECT id, apid_cluster_id, scope, org, - env, created, created_by, updated, - updated_by - FROM EDGEX_DATA_SCOPE`) - Expect(err).NotTo(HaveOccurred()) - defer rows.Close() - - d := dataDataScope{} - for rows.Next() { - rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org, - &d.Env, &d.Created, &d.CreatedBy, &d.Updated, - &d.UpdatedBy) - dds = append(dds, d) - } - - Expect(len(dds)).To(Equal(3)) - ds := dds[0] - - Expect(ds.ID).To(Equal("i")) - Expect(ds.Org).To(Equal("o")) - Expect(ds.Env).To(Equal("e1")) - Expect(ds.Scope).To(Equal("s1")) - Expect(ds.Created).To(Equal("c")) - Expect(ds.CreatedBy).To(Equal("c")) - Expect(ds.Updated).To(Equal("u")) - Expect(ds.UpdatedBy).To(Equal("u")) - - ds = dds[1] - Expect(ds.Env).To(Equal("e2")) - Expect(ds.Scope).To(Equal("s1")) - ds = dds[2] - Expect(ds.Env).To(Equal("e3")) - Expect(ds.Scope).To(Equal("s2")) - - scopes := findScopesForId("a") - Expect(len(scopes)).To(Equal(6)) - expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"} - sort.Strings(scopes) - sort.Strings(expectedScopes) - Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue()) - }, 3) }) - Context("ApigeeSync change event", func() { - - Context(LISTENER_TABLE_APID_CLUSTER, func() { - - It("insert event should panic", func() { - ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic") - processSnapshot(&ssEvent) - - //save the last snapshot, so we can restore it at the end of this context - - csEvent := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Insert, - Table: LISTENER_TABLE_APID_CLUSTER, - }, + It("postInitPlugins, start cleanly", func() { + testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount)) + eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins) + eventService.Emit(testEvent, apid.PluginsInitializedEvent{ + Description: "test", + Plugins: []apid.PluginData{ + { + Name: "name", + Version: "0.0.1", + ExtraData: map[string]interface{}{ + "schemaVersion": "0.0.1", }, - } - - Expect(func() { processChangeList(&csEvent) }).To(Panic()) - }, 3) - - It("update event should panic", func() { - ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic") - processSnapshot(&ssEvent) - - event := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Update, - Table: LISTENER_TABLE_APID_CLUSTER, - }, - }, - } - - Expect(func() { processChangeList(&event) }).To(Panic()) - //restore the last snapshot - }, 3) - + }, + }, }) - - Context(LISTENER_TABLE_DATA_SCOPE, func() { - - It("insert event should add", func() { - ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert") - processSnapshot(&ssEvent) - - event := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Insert, - Table: LISTENER_TABLE_DATA_SCOPE, - NewRow: common.Row{ - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s1"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - "_change_selector": &common.ColumnVal{Value: "cs"}, - }, - }, - { - Operation: common.Insert, - Table: LISTENER_TABLE_DATA_SCOPE, - NewRow: common.Row{ - "id": &common.ColumnVal{Value: "j"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s2"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - "_change_selector": &common.ColumnVal{Value: "cs"}, - }, - }, - }, - } - - processChangeList(&event) - - var dds []dataDataScope - - rows, err := getDB().Query(` - SELECT id, apid_cluster_id, scope, org, - env, created, created_by, updated, - updated_by - FROM EDGEX_DATA_SCOPE`) - Expect(err).NotTo(HaveOccurred()) - defer rows.Close() - - d := dataDataScope{} - for rows.Next() { - rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org, - &d.Env, &d.Created, &d.CreatedBy, &d.Updated, - &d.UpdatedBy) - dds = append(dds, d) - } - - //three already existing - Expect(len(dds)).To(Equal(2)) - ds := dds[0] - - Expect(ds.ID).To(Equal("i")) - Expect(ds.Org).To(Equal("o")) - Expect(ds.Env).To(Equal("e")) - Expect(ds.Scope).To(Equal("s1")) - Expect(ds.Created).To(Equal("c")) - Expect(ds.CreatedBy).To(Equal("c")) - Expect(ds.Updated).To(Equal("u")) - Expect(ds.UpdatedBy).To(Equal("u")) - - ds = dds[1] - Expect(ds.Scope).To(Equal("s2")) - - scopes := findScopesForId("a") - Expect(len(scopes)).To(Equal(2)) - Expect(scopes[0]).To(Equal("s1")) - Expect(scopes[1]).To(Equal("s2")) - - }, 3) - - It("delete event should delete", func() { - ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete") - processSnapshot(&ssEvent) - insert := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Insert, - Table: LISTENER_TABLE_DATA_SCOPE, - NewRow: common.Row{ - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - "_change_selector": &common.ColumnVal{Value: "cs"}, - }, - }, - }, - } - - processChangeList(&insert) - - delete := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Delete, - Table: LISTENER_TABLE_DATA_SCOPE, - OldRow: insert.Changes[0].NewRow, - }, - }, - } - - processChangeList(&delete) - - var nRows int - err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows) - Expect(err).NotTo(HaveOccurred()) - - Expect(0).To(Equal(nRows)) - }, 3) - - It("update event should panic for data scopes table", func() { - ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic") - processSnapshot(&ssEvent) - - event := common.ChangeList{ - LastSequence: "test", - Changes: []common.Change{ - { - Operation: common.Update, - Table: LISTENER_TABLE_DATA_SCOPE, - }, - }, - } - - Expect(func() { processChangeList(&event) }).To(Panic()) - //restore the last snapshot - }, 3) - - //TODO add tests for update/insert/delete cluster - }) + Expect(<-dummySnapMan.downloadCalledChan).Should(BeFalse()) + Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue()) }) + + It("postInitPlugins, start from local db", func() { + apidInfo.LastSnapshot = "test_snapshot" + testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount)) + eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins) + eventService.Emit(testEvent, apid.PluginsInitializedEvent{ + Description: "test", + Plugins: []apid.PluginData{ + { + Name: "name", + Version: "0.0.1", + ExtraData: map[string]interface{}{ + "schemaVersion": "0.0.1", + }, + }, + }, + }) + Expect(<-dummySnapMan.startCalledChan).Should(BeTrue()) + Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue()) + }) + })
diff --git a/managerInterfaces.go b/managerInterfaces.go index a93e950..978aede 100644 --- a/managerInterfaces.go +++ b/managerInterfaces.go
@@ -15,31 +15,38 @@ package apidApigeeSync import ( + "github.com/apid/apid-core" "github.com/apigee-labs/transicator/common" - "net/url" ) type tokenManager interface { getBearerToken() string - invalidateToken() error - getToken() *OauthToken + invalidateToken() close() - getRetrieveNewTokenClosure(*url.URL) func(chan bool) error start() getTokenReadyChannel() <-chan bool } -type snapShotManager interface { +type snapshotManager interface { close() <-chan bool downloadBootSnapshot() - storeBootSnapshot(snapshot *common.Snapshot) - downloadDataSnapshot() - storeDataSnapshot(snapshot *common.Snapshot) - downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error - startOnLocalSnapshot(snapshot string) *common.Snapshot + downloadDataSnapshot() error + startOnDataSnapshot(snapshot string) error } type changeManager interface { close() <-chan bool pollChangeWithBackoff() } + +type DbManager interface { + initDB() error + setDB(db apid.DB) + getLastSequence() (lastSequence string) + findScopesForId(configId string) (scopes []string, err error) + updateLastSequence(lastSequence string) error + getApidInstanceInfo() (info apidInstanceInfo, err error) + processChangeList(changes *common.ChangeList) error + processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error + getKnowTables() map[string]bool +}
diff --git a/mock_server.go b/mock_server_test.go similarity index 94% rename from mock_server.go rename to mock_server_test.go index f39adb7..e61b83e 100644 --- a/mock_server.go +++ b/mock_server_test.go
@@ -90,14 +90,14 @@ changeChannel chan []byte sequenceID *int64 maxDevID *int64 - deployIDMutex sync.RWMutex + deployIDMutex *sync.RWMutex minDeploymentID *int64 maxDeploymentID *int64 newSnap *int32 authFail *int32 } -func (m *MockServer) forceAuthFail() { +func (m *MockServer) forceAuthFailOnce() { atomic.StoreInt32(m.authFail, 1) } @@ -118,11 +118,13 @@ } func (m *MockServer) lastSequenceID() string { - return strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10) + num := strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10) + return num + "." + num + "." + num } func (m *MockServer) nextSequenceID() string { - return strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10) + num := strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10) + return num + "." + num + "." + num } func (m *MockServer) nextDeveloperID() string { @@ -180,7 +182,7 @@ m.newSnap = new(int32) m.authFail = new(int32) *m.authFail = 0 - + m.deployIDMutex = &sync.RWMutex{} initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3") initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3") @@ -266,8 +268,11 @@ scopes := q["scope"] Expect(scopes).To(ContainElement(m.params.ClusterID)) - - w.Header().Set("Transicator-Snapshot-TXID", util.GenerateUUID()) + if m.params.Scope != "" { + Expect(scopes).To(ContainElement(m.params.Scope)) + } + m.snapshotID = util.GenerateUUID() + w.Header().Set(headerSnapshotNumber, m.snapshotID) if len(scopes) == 1 { //send bootstrap db @@ -285,7 +290,7 @@ func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) { defer GinkgoRecover() - val := atomic.SwapInt32(m.newSnap, 0) + val := atomic.LoadInt32(m.newSnap) if val > 0 { log.Debug("MockServer: force new snapshot") w.WriteHeader(http.StatusBadRequest) @@ -311,7 +316,9 @@ //Expect(q.Get("snapshot")).To(Equal(m.snapshotID)) Expect(scopes).To(ContainElement(m.params.ClusterID)) - //Expect(scopes).To(ContainElement(m.params.Scope)) + if m.params.Scope != "" { + Expect(scopes).To(ContainElement(m.params.Scope)) + } // todo: the following is just legacy for the existing test in apigeeSync_suite_test developer := m.createDeveloperWithProductAndApp() @@ -343,6 +350,7 @@ // force failing auth check if atomic.LoadInt32(m.authFail) == 1 { + atomic.StoreInt32(m.authFail, 0) w.WriteHeader(http.StatusUnauthorized) w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. "))) return @@ -356,7 +364,7 @@ // check auth header auth := req.Header.Get("Authorization") - expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken) + expectedAuth := m.getBearerToken() if auth != expectedAuth { w.WriteHeader(http.StatusUnauthorized) w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth))) @@ -366,6 +374,10 @@ } } +func (m *MockServer) getBearerToken() string { + return fmt.Sprintf("Bearer %s", m.oauthToken) +} + // make a handler unreliable func (m *MockServer) unreliable(target http.HandlerFunc) http.HandlerFunc { if m.params.ReliableAPI {
diff --git a/pluginData.go b/pluginData.go index c565c02..10b2dd0 100644 --- a/pluginData.go +++ b/pluginData.go
@@ -16,7 +16,7 @@ import "github.com/apid/apid-core" -var pluginData = apid.PluginData{ +var PluginData = apid.PluginData{ Name: "apidApigeeSync", Version: "0.0.4", ExtraData: map[string]interface{}{
diff --git a/snapshot.go b/snapshot.go index df4d63f..023314d 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -15,12 +15,12 @@ package apidApigeeSync import ( - "github.com/apid/apid-core" "github.com/apid/apid-core/data" "github.com/apigee-labs/transicator/common" "net/http" "os" + "fmt" "io" "io/ioutil" "net/url" @@ -29,7 +29,14 @@ "time" ) -type simpleSnapShotManager struct { +const bootstrapSnapshotName = "bootstrap" +const lengthSqliteFileName = 7 // len("/sqlite") +const ( + headerSnapshotNumber = "Transicator-Snapshot-TXID" +) + +type apidSnapshotManager struct { + *offlineSnapshotManager // to send quit signal to the downloading thread quitChan chan bool // to mark the graceful close of snapshotManager @@ -38,32 +45,40 @@ isClosed *int32 // make sure close() returns immediately if there's no downloading/processing snapshot isDownloading *int32 + tokenMan tokenManager + dbMan DbManager + client *http.Client } -func createSnapShotManager() *simpleSnapShotManager { +func createSnapShotManager(dbMan DbManager, tokenMan tokenManager, client *http.Client) *apidSnapshotManager { isClosedInt := int32(0) isDownloadingInt := int32(0) - return &simpleSnapShotManager{ + return &apidSnapshotManager{ + offlineSnapshotManager: &offlineSnapshotManager{ + dbMan: dbMan, + }, quitChan: make(chan bool, 1), finishChan: make(chan bool, 1), isClosed: &isClosedInt, isDownloading: &isDownloadingInt, + dbMan: dbMan, + tokenMan: tokenMan, + client: client, } } /* - * thread-safe close of snapShotManager + * thread-safe close of snapshotManager * It marks status as closed immediately, and quits backoff downloading * use <- close() for blocking close * should only be called by pollChangeManager, because pollChangeManager is dependent on it */ -func (s *simpleSnapShotManager) close() <-chan bool { +func (s *apidSnapshotManager) close() <-chan bool { //has been closed before if atomic.SwapInt32(s.isClosed, 1) == int32(1) { - log.Error("snapShotManager: close() called on a closed snapShotManager!") + log.Warn("snapshotManager: close() called on a closed snapshotManager!") go func() { s.finishChan <- false - log.Debug("change manager closed") }() return s.finishChan } @@ -77,163 +92,52 @@ } // retrieve boot information: apid_config and apid_config_scope -func (s *simpleSnapShotManager) downloadBootSnapshot() { +func (s *apidSnapshotManager) downloadBootSnapshot() { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!") } defer atomic.StoreInt32(s.isDownloading, int32(0)) - // has been closed - if atomic.LoadInt32(s.isClosed) == int32(1) { - log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager") - return - } - log.Debug("download Snapshot for boot data") scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} - err := s.downloadSnapshot(true, scopes, snapshot) - if err != nil { - // this may happen during shutdown - if _, ok := err.(quitSignalError); ok { - log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error()) - } - return - } - - // has been closed - if atomic.LoadInt32(s.isClosed) == int32(1) { - log.Error("snapShotManager: processSnapshot called on closed snapShotManager") - return - } + s.downloadSnapshot(true, scopes, snapshot) // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot s.storeBootSnapshot(snapshot) } -func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) { - processSnapshot(snapshot) +func (s *apidSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) { + if err := s.dbMan.processSnapshot(snapshot, false); err != nil { + log.Panic(err) + } } // use the scope IDs from the boot snapshot to get all the data associated with the scopes -func (s *simpleSnapShotManager) downloadDataSnapshot() { +func (s *apidSnapshotManager) downloadDataSnapshot() error { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!") } defer atomic.StoreInt32(s.isDownloading, int32(0)) - // has been closed - if atomic.LoadInt32(s.isClosed) == int32(1) { - log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager") - return - } - log.Debug("download Snapshot for data scopes") - scopes := findScopesForId(apidInfo.ClusterID) + scopes, err := s.dbMan.findScopesForId(apidInfo.ClusterID) + if err != nil { + return err + } scopes = append(scopes, apidInfo.ClusterID) snapshot := &common.Snapshot{} - err := s.downloadSnapshot(false, scopes, snapshot) - if err != nil { - // this may happen during shutdown - if _, ok := err.(quitSignalError); ok { - log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error()) - } - return - } - s.storeDataSnapshot(snapshot) -} - -func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { - knownTables = extractTablesFromSnapshot(snapshot) - - _, err := dataService.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - - processSnapshot(snapshot) - log.Info("Emitting Snapshot to plugins") - - select { - case <-time.After(pluginTimeout): - log.Panic("Timeout. Plugins failed to respond to snapshot.") - case <-events.Emit(ApigeeSyncEventSelector, snapshot): - // the new snapshot has been processed - // if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency - } - -} - -func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) { - - tables = make(map[string]bool) - - log.Debug("Extracting table names from snapshot") - //if this panic ever fires, it's a bug - db, err := dataService.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - return extractTablesFromDB(db) -} - -func extractTablesFromDB(db apid.DB) (tables map[string]bool) { - - tables = make(map[string]bool) - - log.Debug("Extracting table names from existing DB") - rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") - defer rows.Close() - - if err != nil { - log.Panicf("Error reading current set of tables: %v", err) - } - - for rows.Next() { - var table string - if err := rows.Scan(&table); err != nil { - log.Panicf("Error reading current set of tables: %v", err) - } - log.Debugf("Table %s found in existing db", table) - - tables[table] = true - } - return tables -} - -// Skip Downloading snapshot if there is already a snapshot available from previous run -func (s *simpleSnapShotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot { - log.Infof("Starting on local snapshot: %s", snapshotName) - - // ensure DB version will be accessible on behalf of dependant plugins - db, err := dataService.DBVersion(snapshotName) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - - knownTables = extractTablesFromDB(db) - snapshot := &common.Snapshot{ - SnapshotInfo: snapshotName, - } - processSnapshot(snapshot) - - // allow plugins (including this one) to start immediately on existing database - // Note: this MUST have no tables as that is used as an indicator - return snapshot + s.downloadSnapshot(false, scopes, snapshot) + return s.startOnDataSnapshot(snapshot.SnapshotInfo) } // a blocking method // will keep retrying with backoff until success -func (s *simpleSnapShotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error { - // if closed - if atomic.LoadInt32(s.isClosed) == int32(1) { - log.Warn("Trying to download snapshot with a closed snapShotManager") - return quitSignalError{} - } +func (s *apidSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) { log.Debug("downloadSnapshot") @@ -254,12 +158,11 @@ //pollWithBackoff only accepts function that accept a single quit channel //to accommodate functions which need more parameters, wrap them in closures - attemptDownload := getAttemptDownloadClosure(isBoot, snapshot, uri) + attemptDownload := s.getAttemptDownloadClosure(isBoot, snapshot, uri) pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError) - return nil } -func getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error { +func (s *apidSnapshotManager) getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error { return func(_ chan bool) error { var tid string @@ -268,19 +171,16 @@ // should never happen, but if it does, it's unrecoverable anyway log.Panicf("Snapshotserver comm error: %v", err) } - addHeaders(req) - - var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error + addHeaders(req, s.tokenMan.getBearerToken()) if config.GetString(configSnapshotProtocol) != "sqlite" { log.Panic("Only currently supported snashot protocol is sqlite") } req.Header.Set("Accept", "application/transicator+sqlite") - processSnapshotResponse = processSnapshotServerFileResponse // Issue the request to the snapshot server - r, err := httpclient.Do(req) + r, err := s.client.Do(req) if err != nil { log.Errorf("Snapshotserver comm error: %v", err) return err @@ -288,22 +188,28 @@ defer r.Body.Close() - if r.StatusCode != 200 { + switch r.StatusCode { + case http.StatusOK: + break + case http.StatusUnauthorized: + s.tokenMan.invalidateToken() + fallthrough + default: body, _ := ioutil.ReadAll(r.Body) log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body)) - return expected200Error{} + return expected200Error } // Bootstrap scope is a special case, that can occur only once. The tid is // hardcoded to "bootstrap" to ensure there can be no clash of tid between // bootstrap and subsequent data scopes. if isBoot { - tid = "bootstrap" + tid = bootstrapSnapshotName } else { - tid = r.Header.Get("Transicator-Snapshot-TXID") + tid = r.Header.Get(headerSnapshotNumber) } // Decode the Snapshot server response - err = processSnapshotResponse(tid, r.Body, snapshot) + err = processSnapshotServerFileResponse(tid, r.Body, snapshot) if err != nil { log.Errorf("Snapshot server response Data not parsable: %v", err) return err @@ -315,11 +221,22 @@ func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error { dbPath := data.DBPath("common/" + dbId) + dbDir := dbPath[0 : len(dbPath)-lengthSqliteFileName] log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath) + // if other bootstrap snapshot exists, delete the old file + if dbId == bootstrapSnapshotName { + if _, err := os.Stat(dbDir); !os.IsNotExist(err) { + if err = os.RemoveAll(dbDir); err != nil { + log.Errorf("Failed to delete old bootstrap snapshot; %v", err) + return err + } + } + } + //this path includes the sqlite3 file name. why does mkdir all stop at parent?? log.Infof("Creating directory with mkdirall %s", dbPath) - err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700) + err := os.MkdirAll(dbDir, 0700) if err != nil { log.Errorf("Error creating db path %s", err) } @@ -343,10 +260,11 @@ } func handleSnapshotServerError(err error) { - log.Debugf("Error connecting to snapshot server: %v", err) + log.Errorf("Error connecting to snapshot server: %v", err) } type offlineSnapshotManager struct { + dbMan DbManager } func (o *offlineSnapshotManager) close() <-chan bool { @@ -355,33 +273,28 @@ return c } -func (o *offlineSnapshotManager) downloadBootSnapshot() {} - -func (o *offlineSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {} - -func (o *offlineSnapshotManager) downloadDataSnapshot() {} - -func (o *offlineSnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {} - -func (o *offlineSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error { - return nil +func (o *offlineSnapshotManager) downloadBootSnapshot() { + log.Panic("downloadBootSnapshot called for offlineSnapshotManager") } -func (o *offlineSnapshotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot { - log.Infof("Starting on local snapshot: %s", snapshotName) - // ensure DB version will be accessible on behalf of dependant plugins - db, err := dataService.DBVersion(snapshotName) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } +func (o *offlineSnapshotManager) downloadDataSnapshot() error { + return fmt.Errorf("downloadDataSnapshot called for offlineSnapshotManager") +} - knownTables = extractTablesFromDB(db) +func (o *offlineSnapshotManager) startOnDataSnapshot(snapshotName string) error { + log.Infof("Processing snapshot: %s", snapshotName) snapshot := &common.Snapshot{ SnapshotInfo: snapshotName, } - processSnapshot(snapshot) - - // allow plugins (including this one) to start immediately on existing database - // Note: this MUST have no tables as that is used as an indicator - return snapshot + if err := o.dbMan.processSnapshot(snapshot, true); err != nil { + return err + } + log.Info("Emitting Snapshot to plugins") + select { + case <-time.After(pluginTimeout): + return fmt.Errorf("timeout, plugins failed to respond to snapshot") + case <-eventService.Emit(ApigeeSyncEventSelector, snapshot): + // the new snapshot has been processed + } + return nil }
diff --git a/snapshot_test.go b/snapshot_test.go new file mode 100644 index 0000000..8d99563 --- /dev/null +++ b/snapshot_test.go
@@ -0,0 +1,146 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apidApigeeSync + +import ( + "github.com/apid/apid-core" + "github.com/apid/apid-core/api" + "github.com/apigee-labs/transicator/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "net/http" + "net/http/httptest" + "strconv" + "time" +) + +var _ = Describe("Snapshot Manager", func() { + testCount := 0 + var dummyDbMan *dummyDbManager + BeforeEach(func() { + testCount++ + dummyDbMan = &dummyDbManager{} + }) + + Context("offlineSnapshotManager", func() { + var testSnapMan *offlineSnapshotManager + BeforeEach(func() { + testSnapMan = &offlineSnapshotManager{ + dbMan: dummyDbMan, + } + }) + AfterEach(func() { + <-testSnapMan.close() + }) + + It("should have error if download called", func() { + Expect(testSnapMan.downloadDataSnapshot()).ToNot(Succeed()) + Expect(func() { testSnapMan.downloadBootSnapshot() }).To(Panic()) + }) + + It("startOnDataSnapshot should emit events", func() { + called := false + eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + if _, ok := event.(*common.Snapshot); ok { + called = true + } + }) + snapshotId := "test_snapshot_" + strconv.Itoa(testCount) + Expect(testSnapMan.startOnDataSnapshot(snapshotId)).Should(Succeed()) + Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(snapshotId)) + Expect(called).Should(BeTrue()) + }) + }) + + Context("apidSnapshotManager", func() { + var testSnapMan *apidSnapshotManager + var dummyTokenMan *dummyTokenManager + var testServer *httptest.Server + var testRouter apid.Router + var testMock *MockServer + BeforeEach(func() { + dummyTokenMan = &dummyTokenManager{ + invalidateChan: make(chan bool, 1), + } + client := &http.Client{} + testSnapMan = createSnapShotManager(dummyDbMan, dummyTokenMan, client) + + // create a new API service to have a new router for testing + testRouter = api.CreateService().Router() + testServer = httptest.NewServer(testRouter) + // set up mock server + mockParms := MockParms{ + ReliableAPI: true, + ClusterID: config.GetString(configApidClusterId), + TokenKey: config.GetString(configConsumerKey), + TokenSecret: config.GetString(configConsumerSecret), + Scope: "", + Organization: "att", + Environment: "prod", + } + apidInfo.ClusterID = expectedClusterId + apidInfo.InstanceID = expectedInstanceId + testMock = Mock(mockParms, testRouter) + config.Set(configProxyServerBaseURI, testServer.URL) + config.Set(configSnapServerBaseURI, testServer.URL) + config.Set(configChangeServerBaseURI, testServer.URL) + config.Set(configPollInterval, 1*time.Millisecond) + + initialBackoffInterval = time.Millisecond + testMock.oauthToken = "test_token_" + strconv.Itoa(testCount) + dummyTokenMan.token = testMock.oauthToken + }) + + AfterEach(func() { + <-testSnapMan.close() + }) + + It("downloadBootSnapshot happy path", func() { + testMock.normalAuthCheck() + testSnapMan.downloadBootSnapshot() + Expect(dummyDbMan.isDataSnapshot).Should(BeFalse()) + Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName)) + }) + + It("downloadBootSnapshot should retry for auth failure", func() { + testMock.forceAuthFailOnce() + testSnapMan.downloadBootSnapshot() + Expect(dummyDbMan.isDataSnapshot).Should(BeFalse()) + Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName)) + Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue()) + }) + + It("downloadDataSnapshot happy path", func() { + testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount) + dummyDbMan.scopes = []string{testMock.params.Scope} + testMock.normalAuthCheck() + testSnapMan.downloadDataSnapshot() + Expect(dummyDbMan.isDataSnapshot).Should(BeTrue()) + Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID)) + }) + + It("downloadDataSnapshot should retry for auth failure", func() { + testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount) + dummyDbMan.scopes = []string{testMock.params.Scope} + testMock.forceAuthFailOnce() + testSnapMan.downloadDataSnapshot() + Expect(dummyDbMan.isDataSnapshot).Should(BeTrue()) + Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID)) + Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue()) + }) + + }) + +})
diff --git a/test_mock_test.go b/test_mock_test.go index de2f673..9ba4e19 100644 --- a/test_mock_test.go +++ b/test_mock_test.go
@@ -14,10 +14,115 @@ package apidApigeeSync import ( + "github.com/apid/apid-core" "github.com/apigee-labs/transicator/common" - "net/url" + "math/rand" + "net/http" + "strconv" ) +type mockService struct { + config apid.ConfigService + log apid.LogService + api apid.APIService + data apid.DataService + events apid.EventsService +} + +func (s *mockService) API() apid.APIService { + return s.api +} + +func (s *mockService) Config() apid.ConfigService { + return s.config +} + +func (s *mockService) Data() apid.DataService { + return s.data +} + +func (s *mockService) Events() apid.EventsService { + return s.events +} + +func (s *mockService) Log() apid.LogService { + return s.log +} + +type mockApi struct { + handleMap map[string]http.HandlerFunc +} + +func (m *mockApi) Listen() error { + return nil +} +func (m *mockApi) Handle(path string, handler http.Handler) apid.Route { + return nil +} +func (m *mockApi) HandleFunc(path string, handlerFunc http.HandlerFunc) apid.Route { + m.handleMap[path] = handlerFunc + return apid.API().HandleFunc(path+strconv.Itoa(rand.Int()), handlerFunc) +} +func (m *mockApi) Vars(r *http.Request) map[string]string { + return nil +} + +func (m *mockApi) Router() apid.Router { + return nil +} + +type mockData struct { +} + +func (m *mockData) DB() (apid.DB, error) { + return nil, nil +} + +func (m *mockData) DBForID(id string) (apid.DB, error) { + return nil, nil +} + +func (m *mockData) DBVersion(version string) (apid.DB, error) { + return nil, nil +} +func (m *mockData) DBVersionForID(id, version string) (apid.DB, error) { + return nil, nil +} + +func (m *mockData) ReleaseDB(version string) {} +func (m *mockData) ReleaseCommonDB() {} +func (m *mockData) ReleaseDBForID(id, version string) {} + +type mockEvent struct { + listenerMap map[apid.EventSelector]apid.EventHandlerFunc +} + +func (e *mockEvent) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event { + return nil +} + +func (e *mockEvent) EmitWithCallback(selector apid.EventSelector, event apid.Event, handler apid.EventHandlerFunc) { + +} + +func (e *mockEvent) Listen(selector apid.EventSelector, handler apid.EventHandler) { + +} + +func (e *mockEvent) ListenFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) { + +} + +func (e *mockEvent) ListenOnceFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) { + e.listenerMap[selector] = handler +} + +func (e *mockEvent) StopListening(selector apid.EventSelector, handler apid.EventHandler) { + +} + +func (e *mockEvent) Close() {} + type dummyChangeManager struct { pollChangeWithBackoffChan chan bool } @@ -34,43 +139,34 @@ type dummyTokenManager struct { invalidateChan chan bool + token string + tokenReadyChan chan bool } func (t *dummyTokenManager) getTokenReadyChannel() <-chan bool { - return nil + return t.tokenReadyChan } func (t *dummyTokenManager) getBearerToken() string { - return "" + return t.token } -func (t *dummyTokenManager) invalidateToken() error { +func (t *dummyTokenManager) invalidateToken() { log.Debug("invalidateToken called") - testMock.passAuthCheck() t.invalidateChan <- true - return nil -} - -func (t *dummyTokenManager) getToken() *OauthToken { - return nil } func (t *dummyTokenManager) close() { return } -func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error { - return func(chan bool) error { - return nil - } -} - func (t *dummyTokenManager) start() { } type dummySnapshotManager struct { downloadCalledChan chan bool + startCalledChan chan bool } func (s *dummySnapshotManager) close() <-chan bool { @@ -80,28 +176,60 @@ } func (s *dummySnapshotManager) downloadBootSnapshot() { - + s.downloadCalledChan <- false } -func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) { - -} - -func (s *dummySnapshotManager) downloadDataSnapshot() { - log.Debug("dummySnapshotManager.downloadDataSnapshot() called") +func (s *dummySnapshotManager) downloadDataSnapshot() error { s.downloadCalledChan <- true -} - -func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) { - -} - -func (s *dummySnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error { return nil } -func (s *dummySnapshotManager) startOnLocalSnapshot(snapshot string) *common.Snapshot { - return &common.Snapshot{ - SnapshotInfo: snapshot, - } +func (s *dummySnapshotManager) startOnDataSnapshot(snapshot string) error { + s.startCalledChan <- true + return nil +} + +type dummyDbManager struct { + lastSequence string + knownTables map[string]bool + scopes []string + snapshot *common.Snapshot + isDataSnapshot bool + lastSeqUpdated chan string +} + +func (d *dummyDbManager) initDB() error { + return nil +} +func (d *dummyDbManager) setDB(db apid.DB) { + +} +func (d *dummyDbManager) getLastSequence() (lastSequence string) { + return d.lastSequence +} +func (d *dummyDbManager) findScopesForId(configId string) (scopes []string, err error) { + return d.scopes, nil +} +func (d *dummyDbManager) updateLastSequence(lastSequence string) error { + d.lastSeqUpdated <- lastSequence + return nil +} +func (d *dummyDbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) { + return apidInstanceInfo{ + InstanceID: "", + InstanceName: "", + ClusterID: "", + LastSnapshot: "", + }, nil +} +func (d *dummyDbManager) processChangeList(changes *common.ChangeList) error { + return nil +} +func (d *dummyDbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error { + d.snapshot = snapshot + d.isDataSnapshot = isDataSnapshot + return nil +} +func (d *dummyDbManager) getKnowTables() map[string]bool { + return d.knownTables }
diff --git a/token.go b/token.go index 95d4b06..c355a61 100644 --- a/token.go +++ b/token.go
@@ -17,7 +17,6 @@ import ( "bytes" "encoding/json" - "errors" "github.com/apid/apid-core/util" "io/ioutil" "net/http" @@ -35,15 +34,13 @@ Usage: man := createTokenManager() bearer := man.getBearerToken() - // will automatically update config(configBearerToken) for other modules - // optionally, when done... - man.close() + will automatically update config(configBearerToken) for other modules */ -func createSimpleTokenManager() *simpleTokenManager { +func createApidTokenManager(isNewInstance bool) *apidTokenManager { isClosedInt := int32(0) - t := &simpleTokenManager{ + t := &apidTokenManager{ quitPollingForToken: make(chan bool, 1), closed: make(chan bool), getTokenChan: make(chan bool), @@ -52,11 +49,12 @@ invalidateDone: make(chan bool), tokenUpdatedChan: make(chan bool, 1), isClosed: &isClosedInt, + isNewInstance: isNewInstance, } return t } -type simpleTokenManager struct { +type apidTokenManager struct { token *OauthToken isClosed *int32 quitPollingForToken chan bool @@ -67,19 +65,20 @@ returnTokenChan chan *OauthToken invalidateDone chan bool tokenUpdatedChan chan bool + isNewInstance bool } -func (t *simpleTokenManager) start() { +func (t *apidTokenManager) start() { t.retrieveNewToken() t.refreshTimer = time.After(t.token.refreshIn()) go t.maintainToken() } -func (t *simpleTokenManager) getBearerToken() string { +func (t *apidTokenManager) getBearerToken() string { return t.getToken().AccessToken } -func (t *simpleTokenManager) maintainToken() { +func (t *apidTokenManager) maintainToken() { for { select { case <-t.closed: @@ -100,19 +99,13 @@ } // will block until valid -func (t *simpleTokenManager) invalidateToken() error { - //has been closed - if atomic.LoadInt32(t.isClosed) == int32(1) { - log.Debug("TokenManager: invalidateToken() called on closed tokenManager") - return errors.New("invalidateToken() called on closed tokenManager") - } +func (t *apidTokenManager) invalidateToken() { log.Debug("invalidating token") t.invalidateTokenChan <- true <-t.invalidateDone - return nil } -func (t *simpleTokenManager) getToken() *OauthToken { +func (t *apidTokenManager) getToken() *OauthToken { //has been closed if atomic.LoadInt32(t.isClosed) == int32(1) { log.Debug("TokenManager: getToken() called on closed tokenManager") @@ -126,7 +119,7 @@ * blocking close() of tokenMan */ -func (t *simpleTokenManager) close() { +func (t *apidTokenManager) close() { //has been closed if atomic.SwapInt32(t.isClosed, 1) == int32(1) { log.Panic("TokenManager: close() has been called before!") @@ -141,7 +134,7 @@ } // don't call externally. will block until success. -func (t *simpleTokenManager) retrieveNewToken() { +func (t *apidTokenManager) retrieveNewToken() { log.Debug("Getting OAuth token...") uriString := config.GetString(configProxyServerBaseURI) @@ -151,10 +144,10 @@ } uri.Path = path.Join(uri.Path, "/accesstoken") - pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) }) + pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : %v", err) }) } -func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error { +func (t *apidTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error { return func(_ chan bool) error { form := url.Values{} form.Set("grant_type", "client_credentials") @@ -168,9 +161,9 @@ req.Header.Set("status", "ONLINE") req.Header.Set("plugin_details", apidPluginDetails) - if newInstanceID { + if t.isNewInstance { req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339)) - newInstanceID = false + t.isNewInstance = false } else { req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } @@ -194,7 +187,7 @@ if resp.StatusCode != 200 { log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body)) - return expected200Error{} + return expected200Error } var token OauthToken @@ -212,22 +205,11 @@ } log.Debugf("Got new token: %#v", token) - - /* - if newInstanceID { - newInstanceID = false - err = updateApidInstanceInfo() - if err != nil { - log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err) - return err - - } - } - */ t.token = &token config.Set(configBearerToken, token.AccessToken) //don't block on the buffered channel. that means there is already a signal to serve new token + //TODO: This assumes apid-gateway is 1-1 mapping. Make use of generic long-polling provided by apid-core select { case t.tokenUpdatedChan <- true: default: @@ -238,7 +220,7 @@ } } -func (t *simpleTokenManager) getTokenReadyChannel() <-chan bool { +func (t *apidTokenManager) getTokenReadyChannel() <-chan bool { return t.tokenUpdatedChan }
diff --git a/token_test.go b/token_test.go index 1dfaabb..fa9379c 100644 --- a/token_test.go +++ b/token_test.go
@@ -94,7 +94,7 @@ w.Write(body) })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createSimpleTokenManager() + testedTokenManager := createApidTokenManager(false) testedTokenManager.start() token := testedTokenManager.getToken() @@ -123,7 +123,7 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createSimpleTokenManager() + testedTokenManager := createApidTokenManager(false) testedTokenManager.start() token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) @@ -163,7 +163,7 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createSimpleTokenManager() + testedTokenManager := createApidTokenManager(false) testedTokenManager.start() testedTokenManager.getToken() @@ -179,8 +179,6 @@ finished := make(chan bool, 1) count := 0 - newInstanceID = true - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { count++ @@ -202,7 +200,7 @@ })) config.Set(configProxyServerBaseURI, ts.URL) - testedTokenManager := createSimpleTokenManager() + testedTokenManager := createApidTokenManager(true) testedTokenManager.start() testedTokenManager.getToken() testedTokenManager.invalidateToken()
diff --git a/util.go b/util.go new file mode 100644 index 0000000..7d59214 --- /dev/null +++ b/util.go
@@ -0,0 +1,172 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package apidApigeeSync + +import ( + "fmt" + "math" + "math/rand" + "net/http" + "time" +) + +const ( + httpTimeout = time.Minute + pluginTimeout = time.Minute + maxIdleConnsPerHost = 10 + defaultInitial time.Duration = 200 * time.Millisecond + defaultMax time.Duration = 10 * time.Second + defaultFactor float64 = 2 +) + +var ( + initialBackoffInterval = defaultInitial +) + +var ( + expected200Error = fmt.Errorf("did not recieve OK response") + quitSignalError = fmt.Errorf("signal to quit encountered") + authFailError = fmt.Errorf("authorization failed") +) + +type Backoff struct { + attempt int + initial, max time.Duration + jitter bool + backoffStrategy func() time.Duration +} + +type ExponentialBackoff struct { + Backoff + factor float64 +} + +func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff { + backoff := &ExponentialBackoff{} + + if initial <= 0 { + initial = defaultInitial + } + if max <= 0 { + max = defaultMax + } + + if factor <= 0 { + factor = defaultFactor + } + + backoff.initial = initial + backoff.max = max + backoff.attempt = 0 + backoff.factor = factor + backoff.jitter = jitter + backoff.backoffStrategy = backoff.exponentialBackoffStrategy + + return backoff +} + +func (b *Backoff) Duration() time.Duration { + d := b.backoffStrategy() + b.attempt++ + return d +} + +func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration { + + initial := float64(b.Backoff.initial) + attempt := float64(b.Backoff.attempt) + duration := initial * math.Pow(b.factor, attempt) + + if duration > math.MaxInt64 { + return b.max + } + dur := time.Duration(duration) + + if b.jitter { + duration = rand.Float64()*(duration-initial) + initial + } + + if dur > b.max { + return b.max + } + + log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond)) + return dur +} + +func (b *Backoff) Reset() { + b.attempt = 0 +} + +func (b *Backoff) Attempt() int { + return b.attempt +} + +/* + * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy + * for retrying on errors + */ +func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) { + + backoff := NewExponentialBackoff(initialBackoffInterval, config.GetDuration(configPollInterval), 2, true) + + //initialize the retry channel to start first attempt immediately + retry := time.After(0 * time.Millisecond) + + for { + select { + case <-quit: + log.Info("Quit signal recieved. Returning") + return + case <-retry: + start := time.Now() + + err := toExecute(quit) + if err == nil || err == quitSignalError { + return + } + + end := time.Now() + //error encountered, since we would have returned above otherwise + handleError(err) + + /* TODO keep this around? Imagine an immediately erroring service, + * causing many sequential requests which could pollute logs + */ + //only backoff if the request took less than one second + if end.After(start.Add(time.Second)) { + backoff.Reset() + retry = time.After(0 * time.Millisecond) + } else { + retry = time.After(backoff.Duration()) + } + } + } +} + +func addHeaders(req *http.Request, token string) { + req.Header.Set("Authorization", "Bearer "+token) + req.Header.Set("apid_instance_id", apidInfo.InstanceID) + req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) + req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) +} + +type changeServerError struct { + Code string `json:"code"` +} + +func (a changeServerError) Error() string { + return a.Code +}
diff --git a/backoff_test.go b/util_test.go similarity index 100% rename from backoff_test.go rename to util_test.go