Issue 69568832 refactor code, fix bugs, add tests (#76)
* [ISSUE-69568832] refactor
* [ISSUE-69568832] refactor code, fix bugs, add tests
* [ISSUE-69568832] fix docker tests
* [ISSUE-69568832] format code
* [ISSUE-69568832] address comments
diff --git a/api.go b/api.go
index 73399d7..1fceebb 100644
--- a/api.go
+++ b/api.go
@@ -24,32 +24,45 @@
const tokenEndpoint = "/accesstoken"
-func InitAPI(services apid.Services) {
- services.API().HandleFunc(tokenEndpoint, getAccessToken).Methods("GET")
+const (
+ // long-polling timeout from http header
+ parBlock = "block"
+ // long-polling tag used for comparision
+ // if tag fails to match, new token is returned immediately
+ parTag = "If-None-Match"
+)
+
+type ApiManager struct {
+ tokenMan tokenManager
+ endpoint string
}
-func getAccessToken(w http.ResponseWriter, r *http.Request) {
- b := r.URL.Query().Get("block")
+func (a *ApiManager) InitAPI(api apid.APIService) {
+ api.HandleFunc(a.endpoint, a.getAccessToken).Methods("GET")
+}
+
+func (a *ApiManager) getAccessToken(w http.ResponseWriter, r *http.Request) {
+ b := r.URL.Query().Get(parBlock)
var timeout int
if b != "" {
var err error
timeout, err = strconv.Atoi(b)
- if err != nil {
+ if err != nil || timeout < 0 {
writeError(w, http.StatusBadRequest, "bad block value, must be number of seconds")
return
}
}
log.Debugf("api timeout: %d", timeout)
- ifNoneMatch := r.Header.Get("If-None-Match")
-
- if apidTokenManager.getBearerToken() != ifNoneMatch {
- w.Write([]byte(apidTokenManager.getBearerToken()))
+ ifNoneMatch := r.Header.Get(parTag)
+ log.Debugf("ifNoneMatch: %s", ifNoneMatch)
+ if a.tokenMan.getBearerToken() != ifNoneMatch {
+ w.Write([]byte(a.tokenMan.getBearerToken()))
return
}
select {
- case <-apidTokenManager.getTokenReadyChannel():
- w.Write([]byte(apidTokenManager.getBearerToken()))
+ case <-a.tokenMan.getTokenReadyChannel():
+ w.Write([]byte(a.tokenMan.getBearerToken()))
case <-time.After(time.Duration(timeout) * time.Second):
w.WriteHeader(http.StatusNotModified)
}
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..d46ac4c
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,125 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+ "fmt"
+ "github.com/apid/apid-core"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "strconv"
+ "time"
+)
+
+const (
+ apiTestUrl = "http://127.0.0.1:9000"
+)
+
+var _ = Describe("API Manager", func() {
+ testCount := 0
+ var testApiMan *ApiManager
+ var dummyTokenMan *dummyTokenManager
+ var client *http.Client
+ BeforeEach(func() {
+ testCount++
+ dummyTokenMan = &dummyTokenManager{
+ token: fmt.Sprintf("test_token_%d", testCount),
+ tokenReadyChan: make(chan bool, 1),
+ }
+ testApiMan = &ApiManager{
+ endpoint: tokenEndpoint + strconv.Itoa(testCount),
+ tokenMan: dummyTokenMan,
+ }
+ testApiMan.InitAPI(apid.API())
+ time.Sleep(100 * time.Millisecond)
+ client = &http.Client{}
+ })
+
+ clientGet := func(path string, pars map[string][]string, header map[string][]string) (int, []byte) {
+ uri, err := url.Parse(apiTestUrl + path)
+ Expect(err).Should(Succeed())
+ query := url.Values(pars)
+ uri.RawQuery = query.Encode()
+ httpReq, err := http.NewRequest("GET", uri.String(), nil)
+ httpReq.Header = http.Header(header)
+ Expect(err).Should(Succeed())
+ res, err := client.Do(httpReq)
+ Expect(err).Should(Succeed())
+ defer res.Body.Close()
+ responseBody, err := ioutil.ReadAll(res.Body)
+ Expect(err).Should(Succeed())
+ return res.StatusCode, responseBody
+ }
+
+ It("should get token without long-polling", func() {
+ code, res := clientGet(testApiMan.endpoint, nil, nil)
+ Expect(code).Should(Equal(http.StatusOK))
+ Expect(string(res)).Should(Equal(dummyTokenMan.token))
+ })
+
+ It("should get bad request for invalid timeout", func() {
+ code, _ := clientGet(testApiMan.endpoint, map[string][]string{
+ parBlock: {"invalid"},
+ }, map[string][]string{
+ parTag: {dummyTokenMan.getBearerToken()},
+ })
+ Expect(code).Should(Equal(http.StatusBadRequest))
+
+ code, _ = clientGet(testApiMan.endpoint, map[string][]string{
+ parBlock: {"-1"},
+ }, map[string][]string{
+ parTag: {dummyTokenMan.getBearerToken()},
+ })
+ Expect(code).Should(Equal(http.StatusBadRequest))
+ })
+
+ It("should get token immediately if mismatch", func() {
+ code, res := clientGet(testApiMan.endpoint, map[string][]string{
+ parBlock: {"10"},
+ }, map[string][]string{
+ parTag: {"mismatch"},
+ })
+ Expect(code).Should(Equal(http.StatusOK))
+ Expect(string(res)).Should(Equal(dummyTokenMan.token))
+ }, 3)
+
+ It("should get StatusNotModified if timeout", func() {
+ code, _ := clientGet(testApiMan.endpoint, map[string][]string{
+ parBlock: {"1"},
+ }, map[string][]string{
+ parTag: {dummyTokenMan.getBearerToken()},
+ })
+ Expect(code).Should(Equal(http.StatusNotModified))
+ }, 3)
+
+ It("should do long-polling", func() {
+ go func() {
+ time.Sleep(1)
+ dummyTokenMan.token = "new_token"
+ dummyTokenMan.tokenReadyChan <- true
+ }()
+ code, res := clientGet(testApiMan.endpoint, map[string][]string{
+ parBlock: {"10"},
+ }, map[string][]string{
+ parTag: {dummyTokenMan.getBearerToken()},
+ })
+ Expect(code).Should(Equal(http.StatusOK))
+ Expect(string(res)).Should(Equal(dummyTokenMan.getBearerToken()))
+ }, 3)
+
+})
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 5055f68..47c5498 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -19,72 +19,70 @@
. "github.com/onsi/gomega"
"io/ioutil"
- "net/http/httptest"
"os"
"testing"
"time"
"github.com/apid/apid-core"
-
+ "github.com/apid/apid-core/events"
"github.com/apid/apid-core/factory"
)
-var (
- tmpDir string
- testServer *httptest.Server
- testRouter apid.Router
- testMock *MockServer
- wipeDBAferTest bool
-)
-
const dummyConfigValue string = "placeholder"
const expectedClusterId = "bootstrap"
+var tmpDir string
+
var _ = BeforeSuite(func() {
- wipeDBAferTest = true
-})
-
-var _ = BeforeEach(func() {
apid.Initialize(factory.DefaultServicesFactory())
-
- config = apid.Config()
dataService = apid.Data()
- events = apid.Events()
-
+ config = apid.Config()
+ apiService = apid.API()
+ go apiService.Listen()
+ //dataService = apid.Data()
+ log = apid.Log().ForModule("apigeeSync")
var err error
- tmpDir, err = ioutil.TempDir("", "api_test")
+ tmpDir, err = ioutil.TempDir("", "apid_test")
Expect(err).NotTo(HaveOccurred())
- config.Set("local_storage_path", tmpDir)
-
+ config.Set(configLocalStoragePath, tmpDir)
config.Set(configProxyServerBaseURI, dummyConfigValue)
config.Set(configSnapServerBaseURI, dummyConfigValue)
config.Set(configChangeServerBaseURI, dummyConfigValue)
config.Set(configSnapshotProtocol, "sqlite")
config.Set(configPollInterval, 10*time.Millisecond)
config.Set(configDiagnosticMode, false)
- config.Set(configName, "testhost")
- config.Set(configApidClusterId, expectedClusterId)
config.Set(configConsumerKey, "XXXXXXX")
config.Set(configConsumerSecret, "YYYYYYY")
-
- block = "0"
- log = apid.Log().ForModule("apigeeSync")
+ config.Set(configApidInstanceID, "YYYYYYY")
}, 3)
+var _ = BeforeEach(func() {
+ eventService = events.CreateService()
+ config.Set(configName, "testhost")
+ config.Set(configApidClusterId, expectedClusterId)
+ apidInfo.ClusterID = expectedClusterId
+ apidInfo.InstanceID = "YYYYYYY"
+ apidInfo.LastSnapshot = ""
+ apidInfo.IsNewInstance = true
+})
+
var _ = AfterEach(func() {
- apid.Events().Close()
- lastSequence = ""
+ cleanCommonDb()
+ eventService.Close()
})
var _ = AfterSuite(func() {
- apid.Events().Close()
- if testServer != nil {
- testServer.Close()
- }
- os.RemoveAll(tmpDir)
+ Expect(os.RemoveAll(tmpDir)).Should(Succeed())
})
func TestApigeeSync(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "ApigeeSync Suite")
}
+
+func cleanCommonDb() {
+ db, err := dataService.DB()
+ Expect(err).Should(Succeed())
+ _, err = db.Exec(`DROP TABLE IF EXISTS APID;`)
+ Expect(err).Should(Succeed())
+}
diff --git a/apigee_sync.go b/apigee_sync.go
deleted file mode 100644
index 8a23079..0000000
--- a/apigee_sync.go
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
- "github.com/apid/apid-core"
- "net/http"
- "time"
-)
-
-const (
- httpTimeout = time.Minute
- pluginTimeout = time.Minute
- maxIdleConnsPerHost = 10
-)
-
-var knownTables = make(map[string]bool)
-
-/*
- * Start from existing snapshot if possible
- * If an existing snapshot does not exist, use the apid scope to fetch
- * all data scopes, then get a snapshot for those data scopes
- *
- * Then, poll for changes
- */
-func bootstrap() {
- if isOfflineMode && apidInfo.LastSnapshot == "" {
- log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
- }
-
- if apidInfo.LastSnapshot != "" {
- snapshot := apidSnapshotManager.startOnLocalSnapshot(apidInfo.LastSnapshot)
- events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
- apidChangeManager.pollChangeWithBackoff()
- })
-
- log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
- return
- }
-
- apidSnapshotManager.downloadBootSnapshot()
- apidSnapshotManager.downloadDataSnapshot()
-
- apidChangeManager.pollChangeWithBackoff()
-
-}
-
-/*
- * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
- * for retrying on errors
- */
-func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
-
- backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
-
- //inintialize the retry channel to start first attempt immediately
- retry := time.After(0 * time.Millisecond)
-
- for {
- select {
- case <-quit:
- log.Info("Quit signal recieved. Returning")
- return
- case <-retry:
- start := time.Now()
-
- err := toExecute(quit)
- if err == nil {
- return
- }
-
- if _, ok := err.(quitSignalError); ok {
- return
- }
-
- end := time.Now()
- //error encountered, since we would have returned above otherwise
- handleError(err)
-
- /* TODO keep this around? Imagine an immediately erroring service,
- * causing many sequential requests which could pollute logs
- */
- //only backoff if the request took less than one second
- if end.After(start.Add(time.Second)) {
- backoff.Reset()
- retry = time.After(0 * time.Millisecond)
- } else {
- retry = time.After(backoff.Duration())
- }
- }
- }
-}
-
-func addHeaders(req *http.Request) {
- req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
- req.Header.Set("apid_instance_id", apidInfo.InstanceID)
- req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
- req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
-}
-
-type changeServerError struct {
- Code string `json:"code"`
-}
-
-type quitSignalError struct {
-}
-
-type expected200Error struct {
-}
-
-type authFailError struct {
-}
-
-func (an expected200Error) Error() string {
- return "Did not recieve OK response"
-}
-
-func (a quitSignalError) Error() string {
- return "Signal to quit encountered"
-}
-
-func (a changeServerError) Error() string {
- return a.Code
-}
-
-func (a authFailError) Error() string {
- return "Authorization failed"
-}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
deleted file mode 100644
index f7144a1..0000000
--- a/apigee_sync_test.go
+++ /dev/null
@@ -1,452 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
- "github.com/apid/apid-core"
- "github.com/apid/apid-core/util"
- "github.com/apigee-labs/transicator/common"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "net/http"
- "net/http/httptest"
-)
-
-var _ = Describe("Sync", func() {
- Context("offline mode", func() {
- var (
- testInstanceID = util.GenerateUUID()
- testInstanceName = "offline-instance-name"
- testClusterID = "offline-cluster-id"
- testLastSnapshot = "offline-last-snapshot"
- testChangeMan *dummyChangeManager
- )
-
- var _ = BeforeEach(func() {
- config.Set(configDiagnosticMode, true)
- config.Set(configApidClusterId, testClusterID)
- _initPlugin(apid.AllServices())
- apidSnapshotManager = &dummySnapshotManager{}
- testChangeMan = &dummyChangeManager{
- pollChangeWithBackoffChan: make(chan bool, 1),
- }
- apidChangeManager = testChangeMan
- apidTokenManager = &dummyTokenManager{}
- apidInfo = apidInstanceInfo{
- InstanceID: testInstanceID,
- InstanceName: testInstanceName,
- ClusterID: testClusterID,
- LastSnapshot: testLastSnapshot,
- }
- updateApidInstanceInfo()
-
- })
-
- var _ = AfterEach(func() {
- config.Set(configDiagnosticMode, false)
- if wipeDBAferTest {
- db, err := dataService.DB()
- Expect(err).NotTo(HaveOccurred())
- tx, err := db.Begin()
- _, err = tx.Exec("DELETE FROM APID")
- Expect(err).NotTo(HaveOccurred())
- err = tx.Commit()
- Expect(err).NotTo(HaveOccurred())
-
- }
- wipeDBAferTest = true
- newInstanceID = true
- isOfflineMode = false
- })
-
- It("offline mode should bootstrap from local DB", func(done Done) {
-
- Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
- if s, ok := event.(*common.Snapshot); ok {
- // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
- // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
- Expect(s.SnapshotInfo).Should(Equal(testLastSnapshot))
- Expect(s.Tables).To(BeNil())
- close(done)
- }
- })
- pie := apid.PluginsInitializedEvent{
- Description: "plugins initialized",
- }
- pie.Plugins = append(pie.Plugins, pluginData)
- postInitPlugins(pie)
-
- }, 3)
-
- })
-
- Context("online mode", func() {
- var _ = BeforeEach(func() {
- _initPlugin(apid.AllServices())
- createManagers()
- })
-
- var _ = AfterEach(func() {
- if wipeDBAferTest {
- db, err := dataService.DB()
- Expect(err).NotTo(HaveOccurred())
- tx, err := db.Begin()
- _, err = tx.Exec("DELETE FROM APID")
- Expect(err).NotTo(HaveOccurred())
- err = tx.Commit()
- Expect(err).NotTo(HaveOccurred())
- }
- wipeDBAferTest = true
- })
-
- const expectedDataScopeId1 = "dataScope1"
- const expectedDataScopeId2 = "dataScope2"
-
- var initializeContext = func() {
- testRouter = apid.API().Router()
- testServer = httptest.NewServer(testRouter)
-
- // set up mock server
- mockParms := MockParms{
- ReliableAPI: false,
- ClusterID: config.GetString(configApidClusterId),
- TokenKey: config.GetString(configConsumerKey),
- TokenSecret: config.GetString(configConsumerSecret),
- Scope: "ert452",
- Organization: "att",
- Environment: "prod",
- }
- testMock = Mock(mockParms, testRouter)
-
- config.Set(configProxyServerBaseURI, testServer.URL)
- config.Set(configSnapServerBaseURI, testServer.URL)
- config.Set(configChangeServerBaseURI, testServer.URL)
- }
-
- var restoreContext = func() {
-
- testServer.Close()
-
- config.Set(configProxyServerBaseURI, dummyConfigValue)
- config.Set(configSnapServerBaseURI, dummyConfigValue)
- config.Set(configChangeServerBaseURI, dummyConfigValue)
-
- }
-
- It("should succesfully bootstrap from clean slate", func(done Done) {
- log.Info("Starting sync tests...")
- var closeDone <-chan bool
- initializeContext()
- // do not wipe DB after. Lets use it
- wipeDBAferTest = false
- var lastSnapshot *common.Snapshot
-
- expectedSnapshotTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms_company"},
- common.Change{Table: "edgex_apid_cluster"},
- common.Change{Table: "edgex_data_scope"},
- common.Change{Table: "kms_app_credential"},
- common.Change{Table: "kms_app_credential_apiproduct_mapper"},
- common.Change{Table: "kms_developer"},
- common.Change{Table: "kms_company_developer"},
- common.Change{Table: "kms_api_product"},
- common.Change{Table: "kms_app"}},
- }
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- if s, ok := event.(*common.Snapshot); ok {
-
- Expect(16).To(Equal(len(knownTables)))
- Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
-
- lastSnapshot = s
-
- db, _ := dataService.DBVersion(s.SnapshotInfo)
- var rowCount int
- var id string
-
- err := db.Ping()
- Expect(err).NotTo(HaveOccurred())
- numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;")
- if err != nil {
- Fail("Failed to get correct DB")
- }
- Expect(true).To(Equal(numApidClusters.Next()))
- numApidClusters.Scan(&rowCount)
- Expect(1).To(Equal(rowCount))
- numApidClusters.Close()
- apidClusters, err := db.Query("select id from edgex_apid_cluster;")
- Expect(err).NotTo(HaveOccurred())
- apidClusters.Next()
- apidClusters.Scan(&id)
- Expect(id).To(Equal(expectedClusterId))
- apidClusters.Close()
-
- numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;")
- Expect(err).NotTo(HaveOccurred())
- Expect(true).To(Equal(numDataScopes.Next()))
- numDataScopes.Scan(&rowCount)
- Expect(2).To(Equal(rowCount))
- numDataScopes.Close()
- dataScopes, err := db.Query("select id from edgex_data_scope;")
- Expect(err).NotTo(HaveOccurred())
- dataScopes.Next()
- dataScopes.Scan(&id)
- dataScopes.Next()
-
- if id == expectedDataScopeId1 {
- dataScopes.Scan(&id)
- Expect(id).To(Equal(expectedDataScopeId2))
- } else {
- dataScopes.Scan(&id)
- Expect(id).To(Equal(expectedDataScopeId1))
- }
- dataScopes.Close()
-
- } else if cl, ok := event.(*common.ChangeList); ok {
- closeDone = apidChangeManager.close()
- // ensure that snapshot switched DB versions
- Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
- expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
- Expect(getDB() == expectedDB).Should(BeTrue())
-
- Expect(cl.Changes).To(HaveLen(6))
-
- var tables []string
- for _, c := range cl.Changes {
- tables = append(tables, c.Table)
- Expect(c.NewRow).ToNot(BeNil())
-
- var tenantID string
- c.NewRow.Get("tenant_id", &tenantID)
- Expect(tenantID).To(Equal("ert452"))
- }
-
- Expect(tables).To(ContainElement("kms_app_credential"))
- Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper"))
- Expect(tables).To(ContainElement("kms_developer"))
- Expect(tables).To(ContainElement("kms_company_developer"))
- Expect(tables).To(ContainElement("kms_api_product"))
- Expect(tables).To(ContainElement("kms_app"))
-
- go func() {
- // when close done, all handlers for the first changeList have been executed
- <-closeDone
- defer GinkgoRecover()
- // allow other handler to execute to insert last_sequence
- var seq string
- err = getDB().
- QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;").
- Scan(&seq)
- Expect(err).NotTo(HaveOccurred())
- //}
- Expect(seq).To(Equal(cl.LastSequence))
-
- restoreContext()
- close(done)
- }()
-
- }
- })
- pie := apid.PluginsInitializedEvent{
- Description: "plugins initialized",
- }
- pie.Plugins = append(pie.Plugins, pluginData)
- postInitPlugins(pie)
- }, 5)
-
- It("should bootstrap from local DB if present", func(done Done) {
-
- var closeDone <-chan bool
-
- initializeContext()
- expectedTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms_company"},
- common.Change{Table: "edgex_apid_cluster"},
- common.Change{Table: "edgex_data_scope"}},
- }
- Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
- if s, ok := event.(*common.Snapshot); ok {
- // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
- // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
- closeDone = apidChangeManager.close()
- go func() {
- // when close done, all handlers for the first snapshot have been executed
- <-closeDone
- //verify that the knownTables array has been properly populated from existing DB
- Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
-
- Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
- Expect(s.Tables).To(BeNil())
-
- restoreContext()
- close(done)
- }()
-
- }
- })
- pie := apid.PluginsInitializedEvent{
- Description: "plugins initialized",
- }
- pie.Plugins = append(pie.Plugins, pluginData)
- postInitPlugins(pie)
-
- }, 3)
-
- It("should detect apid_cluster_id change in config yaml", func() {
- Expect(apidInfo).ToNot(BeNil())
- Expect(apidInfo.ClusterID).To(Equal("bootstrap"))
- Expect(apidInfo.InstanceID).ToNot(BeEmpty())
- previousInstanceId := apidInfo.InstanceID
-
- config.Set(configApidClusterId, "new value")
- apidInfo, err := getApidInstanceInfo()
- Expect(err).NotTo(HaveOccurred())
- Expect(apidInfo.LastSnapshot).To(BeEmpty())
- Expect(apidInfo.InstanceID).ToNot(BeEmpty())
- Expect(apidInfo.InstanceID).ToNot(Equal(previousInstanceId))
- Expect(apidInfo.ClusterID).To(Equal("new value"))
- })
-
- It("should correctly identify non-proper subsets with respect to maps", func() {
-
- //test b proper subset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "b"}},
- )).To(BeFalse())
-
- //test a == b
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
- )).To(BeFalse())
-
- //test b superset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
- )).To(BeTrue())
-
- //test b not subset of a
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{common.Change{Table: "c"}},
- )).To(BeTrue())
-
- //test a empty
- Expect(changesHaveNewTables(map[string]bool{},
- []common.Change{common.Change{Table: "a"}},
- )).To(BeTrue())
-
- //test b empty
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
- []common.Change{},
- )).To(BeFalse())
-
- //test b nil
- Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
-
- //test a nil
- Expect(changesHaveNewTables(nil,
- []common.Change{common.Change{Table: "a"}},
- )).To(BeTrue())
- }, 3)
-
- // todo: disabled for now -
- // there is precondition I haven't been able to track down that breaks this test on occasion
- XIt("should process a new snapshot when change server requires it", func(done Done) {
- oldSnap := apidInfo.LastSnapshot
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- defer GinkgoRecover()
-
- if s, ok := event.(*common.Snapshot); ok {
- Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
- close(done)
- }
- })
- testMock.forceNewSnapshot()
- })
-
- It("Verify the Sequence Number Logic works as expected", func() {
- Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
- Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
- Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
- Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
- Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
- Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
- }, 3)
-
- /*
- * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
- */
- It("Should be able to handle duplicate snapshot during bootstrap", func() {
- initializeContext()
- apidTokenManager = createSimpleTokenManager()
- apidTokenManager.start()
- apidSnapshotManager = createSnapShotManager()
- //events.Listen(ApigeeSyncEventSelector, &handler{})
-
- scopes := []string{apidInfo.ClusterID}
- snapshot := &common.Snapshot{}
- apidSnapshotManager.downloadSnapshot(true, scopes, snapshot)
- apidSnapshotManager.storeBootSnapshot(snapshot)
- apidSnapshotManager.storeDataSnapshot(snapshot)
- restoreContext()
- <-apidSnapshotManager.close()
- apidTokenManager.close()
- }, 3)
-
- It("Reuse http.Client connection for multiple concurrent requests", func() {
- var tr *http.Transport
- server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- }))
- tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
- tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
- var rspcnt int = 0
- ch := make(chan *http.Response)
- client := &http.Client{Transport: tr}
- for i := 0; i < 2*maxIdleConnsPerHost; i++ {
- go func(client *http.Client) {
- req, err := http.NewRequest("GET", server.URL, nil)
- resp, err := client.Do(req)
- if err != nil {
- Fail("Unable to process Client request")
- }
- ch <- resp
- resp.Body.Close()
-
- }(client)
- }
- for {
- select {
- case resp := <-ch:
- Expect(resp.StatusCode).To(Equal(http.StatusOK))
- if rspcnt >= 2*maxIdleConnsPerHost-1 {
- return
- }
- rspcnt++
- default:
- }
- }
-
- }, 3)
-
- })
-})
diff --git a/backoff.go b/backoff.go
deleted file mode 100644
index bad8077..0000000
--- a/backoff.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
- "math"
- "math/rand"
- "time"
-)
-
-const defaultInitial time.Duration = 200 * time.Millisecond
-const defaultMax time.Duration = 10 * time.Second
-const defaultFactor float64 = 2
-
-type Backoff struct {
- attempt int
- initial, max time.Duration
- jitter bool
- backoffStrategy func() time.Duration
-}
-
-type ExponentialBackoff struct {
- Backoff
- factor float64
-}
-
-func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
- backoff := &ExponentialBackoff{}
-
- if initial <= 0 {
- initial = defaultInitial
- }
- if max <= 0 {
- max = defaultMax
- }
-
- if factor <= 0 {
- factor = defaultFactor
- }
-
- backoff.initial = initial
- backoff.max = max
- backoff.attempt = 0
- backoff.factor = factor
- backoff.jitter = jitter
- backoff.backoffStrategy = backoff.exponentialBackoffStrategy
-
- return backoff
-}
-
-func (b *Backoff) Duration() time.Duration {
- d := b.backoffStrategy()
- b.attempt++
- return d
-}
-
-func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
-
- initial := float64(b.Backoff.initial)
- attempt := float64(b.Backoff.attempt)
- duration := initial * math.Pow(b.factor, attempt)
-
- if duration > math.MaxInt64 {
- return b.max
- }
- dur := time.Duration(duration)
-
- if b.jitter {
- duration = (rand.Float64()*(duration-initial) + initial)
- }
-
- if dur > b.max {
- return b.max
- }
-
- log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
- return dur
-}
-
-func (b *Backoff) Reset() {
- b.attempt = 0
-}
-
-func (b *Backoff) Attempt() int {
- return b.attempt
-}
diff --git a/change_test.go b/change_test.go
index 511eb82..97d5fb3 100644
--- a/change_test.go
+++ b/change_test.go
@@ -16,144 +16,196 @@
import (
"github.com/apid/apid-core"
+ "github.com/apid/apid-core/api"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "net/http"
"net/http/httptest"
- "os"
+ "strconv"
"time"
)
+const (
+ expectedInstanceId = "dummy"
+)
+
var _ = Describe("Change Agent", func() {
Context("Change Agent Unit Tests", func() {
- var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
- initDb(sqlfile, "./mockdb_change.sqlite3")
- file, err := os.Open("./mockdb_change.sqlite3")
- Expect(err).Should(Succeed())
- s := common.Snapshot{}
- err = processSnapshotServerFileResponse(dbId, file, &s)
- Expect(err).Should(Succeed())
- return s
- }
+ Context("utils", func() {
- var initializeContext = func() {
- testRouter = apid.API().Router()
- testServer = httptest.NewServer(testRouter)
+ It("should correctly identify non-proper subsets with respect to maps", func() {
+ //test b proper subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{{Table: "b"}},
+ )).To(BeFalse())
- // set up mock server
- mockParms := MockParms{
- ReliableAPI: true,
- ClusterID: config.GetString(configApidClusterId),
- TokenKey: config.GetString(configConsumerKey),
- TokenSecret: config.GetString(configConsumerSecret),
- Scope: "ert452",
- Organization: "att",
- Environment: "prod",
- }
- testMock = Mock(mockParms, testRouter)
+ //test a == b
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{{Table: "a"}, {Table: "b"}},
+ )).To(BeFalse())
- config.Set(configProxyServerBaseURI, testServer.URL)
- config.Set(configSnapServerBaseURI, testServer.URL)
- config.Set(configChangeServerBaseURI, testServer.URL)
- config.Set(configPollInterval, 1*time.Millisecond)
- }
+ //test b superset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{{Table: "a"}, {Table: "b"}, {Table: "c"}},
+ )).To(BeTrue())
- var restoreContext = func() {
+ //test b not subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{{Table: "c"}},
+ )).To(BeTrue())
- testServer.Close()
- config.Set(configProxyServerBaseURI, dummyConfigValue)
- config.Set(configSnapServerBaseURI, dummyConfigValue)
- config.Set(configChangeServerBaseURI, dummyConfigValue)
- config.Set(configPollInterval, 10*time.Millisecond)
- }
+ //test a empty
+ Expect(changesHaveNewTables(map[string]bool{},
+ []common.Change{{Table: "a"}},
+ )).To(BeTrue())
- var _ = BeforeEach(func() {
- _initPlugin(apid.AllServices())
- createManagers()
- event := createTestDb("./sql/init_mock_db.sql", "test_change")
- processSnapshot(&event)
- knownTables = extractTablesFromDB(getDB())
- })
+ //test b empty
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{},
+ )).To(BeFalse())
- var _ = AfterEach(func() {
- restoreContext()
- if wipeDBAferTest {
- db, err := dataService.DB()
- Expect(err).Should(Succeed())
- tx, err := db.Begin()
- _, err = tx.Exec("DELETE FROM APID")
- Expect(err).Should(Succeed())
- err = tx.Commit()
- Expect(err).Should(Succeed())
- }
- wipeDBAferTest = true
- })
+ //test b nil
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
- It("test change agent with authorization failure", func() {
- log.Debug("test change agent with authorization failure")
- testTokenManager := &dummyTokenManager{make(chan bool)}
- apidTokenManager = testTokenManager
- apidTokenManager.start()
- apidSnapshotManager = &dummySnapshotManager{}
- initializeContext()
- testMock.forceAuthFail()
- wipeDBAferTest = true
- apidChangeManager.pollChangeWithBackoff()
- // auth check fails
- <-testTokenManager.invalidateChan
- log.Debug("closing")
- <-apidChangeManager.close()
- })
-
- It("test change agent with too old snapshot", func() {
- log.Debug("test change agent with too old snapshot")
- testTokenManager := &dummyTokenManager{make(chan bool)}
- apidTokenManager = testTokenManager
- apidTokenManager.start()
- testSnapshotManager := &dummySnapshotManager{make(chan bool)}
- apidSnapshotManager = testSnapshotManager
- initializeContext()
-
- testMock.passAuthCheck()
- testMock.forceNewSnapshot()
- wipeDBAferTest = true
- apidChangeManager.pollChangeWithBackoff()
- <-testSnapshotManager.downloadCalledChan
- log.Debug("closing")
- <-apidChangeManager.close()
- })
-
- It("change agent should retry with authorization failure", func(done Done) {
- log.Debug("change agent should retry with authorization failure")
- testTokenManager := &dummyTokenManager{make(chan bool)}
- apidTokenManager = testTokenManager
- apidTokenManager.start()
- apidSnapshotManager = &dummySnapshotManager{}
- initializeContext()
- testMock.forceAuthFail()
- testMock.forceNoSnapshot()
- wipeDBAferTest = true
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
- if _, ok := event.(*common.ChangeList); ok {
- closeDone := apidChangeManager.close()
- log.Debug("closing")
- go func() {
- // when close done, all handlers for the first snapshot have been executed
- <-closeDone
- close(done)
- }()
-
- }
+ //test a nil
+ Expect(changesHaveNewTables(nil,
+ []common.Change{{Table: "a"}},
+ )).To(BeTrue())
})
- apidChangeManager.pollChangeWithBackoff()
- // auth check fails
- <-testTokenManager.invalidateChan
- }, 2)
+ It("Compare Sequence Number", func() {
+ Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+ Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+ Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+ Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+ Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+ Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+ })
+ })
+
+ Context("changeManager", func() {
+ testCount := 0
+ var testChangeMan *pollChangeManager
+ var dummyDbMan *dummyDbManager
+ var dummySnapMan *dummySnapshotManager
+ var dummyTokenMan *dummyTokenManager
+ var testServer *httptest.Server
+ var testRouter apid.Router
+ var testMock *MockServer
+ BeforeEach(func() {
+ testCount++
+ dummyDbMan = &dummyDbManager{
+ knownTables: map[string]bool{
+ "_transicator_metadata": true,
+ "_transicator_tables": true,
+ "attributes": true,
+ "edgex_apid_cluster": true,
+ "edgex_data_scope": true,
+ "kms_api_product": true,
+ "kms_app": true,
+ "kms_app_credential": true,
+ "kms_app_credential_apiproduct_mapper": true,
+ "kms_company": true,
+ "kms_company_developer": true,
+ "kms_deployment": true,
+ "kms_developer": true,
+ "kms_organization": true,
+ },
+ scopes: []string{"43aef41d"},
+ lastSeqUpdated: make(chan string, 1),
+ }
+ dummySnapMan = &dummySnapshotManager{
+ downloadCalledChan: make(chan bool, 1),
+ }
+ dummyTokenMan = &dummyTokenManager{
+ invalidateChan: make(chan bool, 1),
+ }
+ client := &http.Client{}
+ testChangeMan = createChangeManager(dummyDbMan, dummySnapMan, dummyTokenMan, client)
+ testChangeMan.block = 0
+
+ // create a new API service to have a new router for testing
+ testRouter = api.CreateService().Router()
+ testServer = httptest.NewServer(testRouter)
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: true,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "",
+ Organization: "att",
+ Environment: "prod",
+ }
+ apidInfo.ClusterID = expectedClusterId
+ apidInfo.InstanceID = expectedInstanceId
+ testMock = Mock(mockParms, testRouter)
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configPollInterval, 1*time.Millisecond)
+
+ initialBackoffInterval = time.Millisecond
+ testMock.oauthToken = "test_token_" + strconv.Itoa(testCount)
+ dummyTokenMan.token = testMock.oauthToken
+
+ })
+
+ AfterEach(func() {
+ testServer.Close()
+ <-testChangeMan.close()
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
+ config.Set(configPollInterval, 10*time.Millisecond)
+ })
+
+ It("test change agent with authorization failure", func() {
+ log.Debug("test change agent with authorization failure")
+ testMock.forceAuthFailOnce()
+ testChangeMan.pollChangeWithBackoff()
+ // auth check fails
+ <-dummyTokenMan.invalidateChan
+ log.Debug("closing")
+ })
+
+ It("test change agent with too old snapshot", func() {
+ log.Debug("test change agent with too old snapshot")
+ testMock.passAuthCheck()
+ testMock.forceNewSnapshot()
+ testChangeMan.pollChangeWithBackoff()
+ <-dummySnapMan.downloadCalledChan
+ log.Debug("closing")
+ })
+
+ It("change agent should retry with authorization failure", func() {
+ log.Debug("change agent should retry with authorization failure")
+ testMock.forceAuthFailOnce()
+ testMock.forceNoSnapshot()
+ called := false
+ eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ if _, ok := event.(*common.ChangeList); ok {
+ called = true
+ }
+ })
+ testChangeMan.pollChangeWithBackoff()
+ <-dummyTokenMan.invalidateChan
+ Expect(<-dummyDbMan.lastSeqUpdated).Should(Equal(testMock.lastSequenceID()))
+ Expect(called).Should(BeTrue())
+ }, 3)
+
+ })
+
+ Context("offline change manager", func() {
+ It("offline change manager should have no effect", func() {
+ o := &offlineChangeManager{}
+ o.pollChangeWithBackoff()
+ <-o.close()
+ })
+ })
})
})
diff --git a/changes.go b/changes.go
index b78a1d1..6afa827 100644
--- a/changes.go
+++ b/changes.go
@@ -16,34 +16,44 @@
import (
"encoding/json"
+ "fmt"
"github.com/apigee-labs/transicator/common"
"io/ioutil"
"net/http"
"net/url"
"path"
"sort"
+ "strconv"
"sync/atomic"
"time"
)
-var lastSequence string
-var block string = "45"
-
type pollChangeManager struct {
// 0 for not closed, 1 for closed
isClosed *int32
// 0 for pollChangeWithBackoff() not launched, 1 for launched
- isLaunched *int32
- quitChan chan bool
+ isLaunched *int32
+ quitChan chan bool
+ block int
+ lastSequence string
+ dbMan DbManager
+ snapMan snapshotManager
+ tokenMan tokenManager
+ client *http.Client
}
-func createChangeManager() *pollChangeManager {
+func createChangeManager(dbMan DbManager, snapMan snapshotManager, tokenMan tokenManager, client *http.Client) *pollChangeManager {
isClosedInt := int32(0)
isLaunchedInt := int32(0)
return &pollChangeManager{
isClosed: &isClosedInt,
quitChan: make(chan bool),
isLaunched: &isLaunchedInt,
+ block: 45,
+ dbMan: dbMan,
+ snapMan: snapMan,
+ tokenMan: tokenMan,
+ client: client,
}
}
@@ -56,7 +66,7 @@
finishChan := make(chan bool, 1)
//has been closed
if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
- log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
+ log.Warn("pollChangeManager: close() called on a closed pollChangeManager!")
go func() {
log.Debug("change manager closed")
finishChan <- false
@@ -65,11 +75,10 @@
}
// not launched
if atomic.LoadInt32(c.isLaunched) == int32(0) {
- log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
+ log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched!")
go func() {
- c.quitChan <- true
- apidTokenManager.close()
- <-apidSnapshotManager.close()
+ c.tokenMan.close()
+ <-c.snapMan.close()
log.Debug("change manager closed")
finishChan <- false
}()
@@ -79,8 +88,8 @@
log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
go func() {
c.quitChan <- true
- apidTokenManager.close()
- <-apidSnapshotManager.close()
+ c.tokenMan.close()
+ <-c.snapMan.close()
log.Debug("change manager closed")
finishChan <- true
}()
@@ -120,43 +129,147 @@
* Check to see if we have lastSequence already saved in the DB,
* in which case, it has to be used to prevent re-reading same data
*/
- lastSequence = getLastSequence()
-
+ c.lastSequence = c.dbMan.getLastSequence()
for {
select {
case <-c.quitChan:
log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
- return quitSignalError{}
+ return quitSignalError
default:
- err := c.getChanges(changesUri)
+ scopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
if err != nil {
- if _, ok := err.(quitSignalError); ok {
- log.Debug("pollChangeAgent: consuming the quit signal")
- <-c.quitChan
- }
+ return err
+ }
+ r, err := c.getChanges(scopes, changesUri)
+ if err != nil {
+ return err
+ }
+ cl, err := c.parseChangeResp(r)
+ if err != nil {
+ return err
+ }
+ if err = c.emitChangeList(scopes, cl); err != nil {
return err
}
}
}
}
-//TODO refactor this method more, split it up
-/* Make a single request to the changeserver to get a changelist */
-func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
- // if closed
- if atomic.LoadInt32(c.isClosed) == int32(1) {
- return quitSignalError{}
+func (c *pollChangeManager) parseChangeResp(r *http.Response) (*common.ChangeList, error) {
+ var err error
+ defer r.Body.Close()
+
+ if r.StatusCode != http.StatusOK {
+ log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
+ switch r.StatusCode {
+ case http.StatusUnauthorized:
+ c.tokenMan.invalidateToken()
+ return nil, authFailError
+
+ case http.StatusNotModified:
+ return nil, nil
+ case http.StatusBadRequest:
+ var apiErr changeServerError
+ var b []byte
+ b, err = ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Errorf("Unable to read response body: %v", err)
+ return nil, err
+ }
+ err = json.Unmarshal(b, &apiErr)
+ if err != nil {
+ log.Errorf("JSON Response Data not parsable: %s", string(b))
+ return nil, err
+ }
+ if apiErr.Code == "SNAPSHOT_TOO_OLD" {
+ log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
+ err = apiErr
+ }
+ return nil, err
+ default:
+ log.Errorf("Unknown response code from change server: %v", r.Status)
+ return nil, fmt.Errorf("unknown response code from change server: %v", r.Status)
+ }
}
+
+ resp := &common.ChangeList{}
+ err = json.NewDecoder(r.Body).Decode(resp)
+ if err != nil {
+ log.Errorf("JSON Response Data not parsable: %v", err)
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (c *pollChangeManager) emitChangeList(scopes []string, cl *common.ChangeList) error {
+ var err error
+ /*
+ * If the lastSequence is already newer or the same than what we got via
+ * cl.LastSequence, Ignore it.
+ */
+ if c.lastSequence != "" &&
+ getChangeStatus(c.lastSequence, cl.LastSequence) != 1 {
+ return nil
+ }
+
+ if changesRequireDDLSync(c.dbMan.getKnowTables(), cl) {
+ return changeServerError{
+ Code: "DDL changes detected; must get new snapshot",
+ }
+ }
+
+ /* If valid data present, Emit to plugins */
+ if len(cl.Changes) > 0 {
+ if err = c.dbMan.processChangeList(cl); err != nil {
+ log.Errorf("Error in processChangeList: %v", err)
+ return err
+ }
+ /*
+ * Check to see if there was any change in scope. If found, handle it
+ * by getting a new snapshot
+ */
+ newScopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
+ if err != nil {
+ return err
+ }
+ cs := scopeChanged(newScopes, scopes)
+ if cs != nil {
+ return cs
+ }
+ select {
+ case <-time.After(httpTimeout):
+ log.Panic("Timeout. Plugins failed to respond to changes.")
+ case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+ }
+ } else if c.lastSequence == "" { // emit the first changelist anyway
+ select {
+ case <-time.After(httpTimeout):
+ log.Panic("Timeout. Plugins failed to respond to changes.")
+ case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+ }
+ } else {
+ log.Debugf("No Changes detected")
+ }
+
+ err = c.dbMan.updateLastSequence(cl.LastSequence)
+ if err != nil {
+ log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
+ }
+ c.lastSequence = cl.LastSequence
+ return nil
+}
+
+/* Make a single request to the changeserver to get a changelist */
+func (c *pollChangeManager) getChanges(scopes []string, changesUri *url.URL) (*http.Response, error) {
log.Debug("polling...")
/* Find the scopes associated with the config id */
- scopes := findScopesForId(apidInfo.ClusterID)
v := url.Values{}
- blockValue := block
+ blockValue := strconv.Itoa(c.block)
/* Sequence added to the query if available */
- if lastSequence != "" {
- v.Add("since", lastSequence)
+ if c.lastSequence != "" {
+ v.Add("since", c.lastSequence)
} else {
blockValue = "0"
}
@@ -178,115 +291,16 @@
/* If error, break the loop, and retry after interval */
req, err := http.NewRequest("GET", uri, nil)
- addHeaders(req)
- r, err := httpclient.Do(req)
+ addHeaders(req, c.tokenMan.getBearerToken())
+ r, err := c.client.Do(req)
if err != nil {
log.Errorf("change agent comm error: %s", err)
- // if closed
- if atomic.LoadInt32(c.isClosed) == int32(1) {
- return quitSignalError{}
- }
- return err
+ return nil, err
}
- defer r.Body.Close()
-
- // has been closed
- if atomic.LoadInt32(c.isClosed) == int32(1) {
- log.Debugf("getChanges: changeManager has been closed")
- return quitSignalError{}
- }
-
- if r.StatusCode != http.StatusOK {
- log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
- switch r.StatusCode {
- case http.StatusUnauthorized:
- err = apidTokenManager.invalidateToken()
- if err != nil {
- return err
- }
- return authFailError{}
-
- case http.StatusNotModified:
- return nil
-
- case http.StatusBadRequest:
- var apiErr changeServerError
- var b []byte
- b, err = ioutil.ReadAll(r.Body)
- if err != nil {
- log.Errorf("Unable to read response body: %v", err)
- return err
- }
- err = json.Unmarshal(b, &apiErr)
- if err != nil {
- log.Errorf("JSON Response Data not parsable: %s", string(b))
- return err
- }
- if apiErr.Code == "SNAPSHOT_TOO_OLD" {
- log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
- err = apiErr
- }
- return err
- }
- return nil
- }
-
- var resp common.ChangeList
- err = json.NewDecoder(r.Body).Decode(&resp)
- if err != nil {
- log.Errorf("JSON Response Data not parsable: %v", err)
- return err
- }
-
- /*
- * If the lastSequence is already newer or the same than what we got via
- * resp.LastSequence, Ignore it.
- */
- if lastSequence != "" &&
- getChangeStatus(lastSequence, resp.LastSequence) != 1 {
- return nil
- }
-
- if changesRequireDDLSync(resp) {
- return changeServerError{
- Code: "DDL changes detected; must get new snapshot",
- }
- }
-
- /* If valid data present, Emit to plugins */
- if len(resp.Changes) > 0 {
- processChangeList(&resp)
- select {
- case <-time.After(httpTimeout):
- log.Panic("Timeout. Plugins failed to respond to changes.")
- case <-events.Emit(ApigeeSyncEventSelector, &resp):
- }
- } else if lastSequence == "" {
- select {
- case <-time.After(httpTimeout):
- log.Panic("Timeout. Plugins failed to respond to changes.")
- case <-events.Emit(ApigeeSyncEventSelector, &resp):
- }
- } else {
- log.Debugf("No Changes detected for Scopes: %s", scopes)
- }
-
- updateSequence(resp.LastSequence)
-
- /*
- * Check to see if there was any change in scope. If found, handle it
- * by getting a new snapshot
- */
- newScopes := findScopesForId(apidInfo.ClusterID)
- cs := scopeChanged(newScopes, scopes)
- if cs != nil {
- return cs
- }
-
- return nil
+ return r, nil
}
-func changesRequireDDLSync(changes common.ChangeList) bool {
+func changesRequireDDLSync(knownTables map[string]bool, changes *common.ChangeList) bool {
return changesHaveNewTables(knownTables, changes.Changes)
}
@@ -296,10 +310,12 @@
log.Debugf("handleChangeServerError: changeManager has been closed")
return
}
- if c, ok := err.(changeServerError); ok {
- log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
- apidSnapshotManager.downloadDataSnapshot()
- } else {
+
+ switch e := err.(type) {
+ case changeServerError:
+ log.Debugf("%s. Fetch a new snapshot to sync...", e.Code)
+ c.snapMan.downloadDataSnapshot()
+ default:
log.Debugf("Error connecting to changeserver: %v", err)
}
}
@@ -310,7 +326,7 @@
func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
//nil maps should not be passed in. Making the distinction between nil map and empty map
- if a == nil {
+ if len(a) == 0 {
log.Warn("Nil map passed to function changesHaveNewTables, may be bug")
return true
}
@@ -332,24 +348,15 @@
func getChangeStatus(lastSeq string, currSeq string) int {
seqPrev, err := common.ParseSequence(lastSeq)
if err != nil {
- log.Panic("Unable to parse previous sequence string")
+ log.Panicf("Unable to parse previous sequence string: %v", err)
}
seqCurr, err := common.ParseSequence(currSeq)
if err != nil {
- log.Panic("Unable to parse current sequence string")
+ log.Panicf("Unable to parse current sequence string: %v", err)
}
return seqCurr.Compare(seqPrev)
}
-func updateSequence(seq string) {
- lastSequence = seq
- err := updateLastSequence(seq)
- if err != nil {
- log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
- }
-
-}
-
/*
* Returns nil if the two arrays have matching contents
*/
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go
index 486da41..8cfa1ef 100644
--- a/cmd/mockServer/main.go
+++ b/cmd/mockServer/main.go
@@ -19,7 +19,6 @@
"os"
-
"github.com/apid/apid-core"
"github.com/apid/apid-core/factory"
"github.com/apid/apidApigeeSync"
@@ -53,16 +52,16 @@
router := apid.API().Router()
params := apidApigeeSync.MockParms{
- ReliableAPI: *reliable,
- ClusterID: "cluster",
- TokenKey: "key",
- TokenSecret: "secret",
- Scope: "scope",
- Organization: "org",
- Environment: "test",
- NumDevelopers: *numDevs,
- NumDeployments: *numDeps,
- BundleURI: *bundleURI,
+ ReliableAPI: *reliable,
+ ClusterID: "cluster",
+ TokenKey: "key",
+ TokenSecret: "secret",
+ Scope: "scope",
+ Organization: "org",
+ Environment: "test",
+ NumDevelopers: *numDevs,
+ NumDeployments: *numDeps,
+ BundleURI: *bundleURI,
}
log.Printf("Params: %#v\n", params)
diff --git a/data.go b/data.go
index c77fc10..be8dd34 100644
--- a/data.go
+++ b/data.go
@@ -28,27 +28,36 @@
)
var (
- unsafeDB apid.DB
- dbMux sync.RWMutex
+ dbMux sync.RWMutex
)
-type dataApidCluster struct {
- ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string
- Updated, Created string
-}
-
-type dataDataScope struct {
- ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string
- Updated, Created string
-}
-
/*
This plugin uses 2 databases:
1. The default DB is used for APID table.
2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE
(Currently, the snapshot never changes, but this is future-proof)
*/
-func initDB(db apid.DB) error {
+
+func creatDbManager() *dbManager {
+ return &dbManager{
+ DbMux: &sync.RWMutex{},
+ knownTables: make(map[string]bool),
+ }
+}
+
+type dbManager struct {
+ Db apid.DB
+ DbMux *sync.RWMutex
+ dbVersion string
+ knownTables map[string]bool
+}
+
+// idempotent call to initialize default DB
+func (dbMan *dbManager) initDB() error {
+ db, err := dataService.DB()
+ if err != nil {
+ return err
+ }
tx, err := db.Begin()
if err != nil {
log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
@@ -75,24 +84,22 @@
return nil
}
-func getDB() apid.DB {
+func (dbMan *dbManager) getDB() apid.DB {
dbMux.RLock()
- db := unsafeDB
- dbMux.RUnlock()
- return db
+ defer dbMux.RUnlock()
+ return dbMan.Db
}
-func setDB(db apid.DB) {
+func (dbMan *dbManager) setDB(db apid.DB) {
dbMux.Lock()
- unsafeDB = db
- dbMux.Unlock()
+ defer dbMux.Unlock()
+ dbMan.Db = db
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
-
+func (dbMan *dbManager) insert(tableName string, rows []common.Row, txn apid.Tx) error {
if len(rows) == 0 {
- return false
+ return fmt.Errorf("no rows")
}
var orderedColumns []string
@@ -101,12 +108,12 @@
}
sort.Strings(orderedColumns)
- sql := buildInsertSql(tableName, orderedColumns, rows)
+ sql := dbMan.buildInsertSql(tableName, orderedColumns, rows)
prep, err := txn.Prepare(sql)
if err != nil {
- log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
- return false
+ log.Errorf("INSERT Fail to prepare statement %s error=%v", sql, err)
+ return err
}
defer prep.Close()
@@ -123,15 +130,15 @@
_, err = prep.Exec(values...)
if err != nil {
- log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
- return false
+ log.Errorf("INSERT Fail %s values=%v error=%v", sql, values, err)
+ return err
}
- log.Debugf("INSERT Success [%s] values=%v", sql, values)
+ log.Debugf("INSERT Success %s values=%v", sql, values)
- return true
+ return nil
}
-func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
+func (dbMan *dbManager) getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
var values = make([]interface{}, len(pkeys))
for i, pkey := range pkeys {
if row[pkey] == nil {
@@ -143,52 +150,46 @@
return values
}
-func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
- pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) delete(tableName string, rows []common.Row, txn apid.Tx) error {
+ pkeys, err := dbMan.getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
- log.Errorf("DELETE No primary keys found for table. %s", tableName)
- return false
+ return fmt.Errorf("DELETE No primary keys found for table. %s", tableName)
}
if len(rows) == 0 {
- log.Errorf("No rows found for table.", tableName)
- return false
+ return fmt.Errorf("no rows found for table %s", tableName)
}
- sql := buildDeleteSql(tableName, rows[0], pkeys)
+ sql := dbMan.buildDeleteSql(tableName, rows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
- log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
- return false
+ return fmt.Errorf("DELETE Fail to prep statement %s error=%v", sql, err)
}
defer prep.Close()
for _, row := range rows {
- values := getValueListFromKeys(row, pkeys)
+ values := dbMan.getValueListFromKeys(row, pkeys)
// delete prepared statement from existing template statement
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
- log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
- return false
+ return fmt.Errorf("DELETE Fail %s values=%v error=%v", sql, values, err)
}
affected, err := res.RowsAffected()
if err == nil && affected != 0 {
- log.Debugf("DELETE Success [%s] values=%v", sql, values)
+ log.Debugf("DELETE Success %s values=%v", sql, values)
} else if err == nil && affected == 0 {
- log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
- return false
+ return fmt.Errorf("entry not found %s values=%v, nothing to delete", sql, values)
} else {
- log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
- return false
+ return fmt.Errorf("DELETE Failed %s values=%v error=%v", sql, values, err)
}
}
- return true
+ return nil
}
// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
-func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
var wherePlaceholders []string
i := 1
@@ -210,14 +211,13 @@
}
-func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
- pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) error {
+ pkeys, err := dbMan.getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
- log.Errorf("UPDATE No primary keys found for table.", tableName)
- return false
+ return fmt.Errorf("UPDATE No primary keys found for table: %v, %v", tableName, err)
}
if len(oldRows) == 0 || len(newRows) == 0 {
- return false
+ return fmt.Errorf("UPDATE No old or new rows, table: %v, %v, %v", tableName, oldRows, newRows)
}
var orderedColumns []string
@@ -229,11 +229,10 @@
sort.Strings(orderedColumns)
//build update statement, use arbitrary row as template
- sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
+ sql := dbMan.buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
- log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
- return false
+ return fmt.Errorf("UPDATE Fail to prep statement %s error=%v", sql, err)
}
defer prep.Close()
@@ -265,25 +264,23 @@
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
- log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
- return false
+ return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
}
numRowsAffected, err := res.RowsAffected()
if err != nil {
- log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
- return false
+ return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
}
//delete this once we figure out why tests are failing/not updating
log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
- log.Debugf("UPDATE Success [%s] values=%v", sql, values)
+ log.Debugf("UPDATE Success %s values=%v", sql, values)
}
- return true
+ return nil
}
-func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
if row == nil {
return ""
}
@@ -311,7 +308,7 @@
}
//precondition: rows.length > 1000, max number of entities for sqlite
-func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
+func (dbMan *dbManager) buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
if len(rows) == 0 {
return ""
}
@@ -343,13 +340,13 @@
return sql
}
-func getPkeysForTable(tableName string) ([]string, error) {
- db := getDB()
+func (dbMan *dbManager) getPkeysForTable(tableName string) ([]string, error) {
+ db := dbMan.getDB()
normalizedTableName := normalizeTableName(tableName)
sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
rows, err := db.Query(sql, normalizedTableName)
if err != nil {
- log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err)
+ log.Errorf("Failed %s values=%s Error: %v", sql, normalizedTableName, err)
return nil, err
}
var columnNames []string
@@ -358,13 +355,15 @@
var value string
err := rows.Scan(&value)
if err != nil {
- log.Fatal(err)
+ log.Errorf("failed to scan column names: %v", err)
+ return nil, err
}
columnNames = append(columnNames, value)
}
err = rows.Err()
if err != nil {
- log.Fatal(err)
+ log.Errorf("failed to scan column names: %v", err)
+ return nil, err
}
return columnNames, nil
}
@@ -377,13 +376,11 @@
* For the given apidConfigId, this function will retrieve all the distinch scopes
* associated with it. Distinct, because scope is already a collection of the tenants.
*/
-func findScopesForId(configId string) (scopes []string) {
+func (dbMan *dbManager) findScopesForId(configId string) (scopes []string, err error) {
log.Debugf("findScopesForId: %s", configId)
-
var scope sql.NullString
- db := getDB()
-
+ db := dbMan.getDB()
query := `
SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1
UNION
@@ -391,7 +388,6 @@
UNION
SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3
`
-
rows, err := db.Query(query, configId, configId, configId)
if err != nil {
log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
@@ -416,9 +412,9 @@
/*
* Retrieve SnapshotInfo for the given apidConfigId from apid_config table
*/
-func getLastSequence() (lastSequence string) {
+func (dbMan *dbManager) getLastSequence() (lastSequence string) {
- err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
+ err := dbMan.getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
if err != nil && err != sql.ErrNoRows {
log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
return
@@ -432,11 +428,11 @@
* Persist the last change Id each time a change has been successfully
* processed by the plugin(s)
*/
-func updateLastSequence(lastSequence string) error {
+func (dbMan *dbManager) updateLastSequence(lastSequence string) error {
log.Debugf("updateLastSequence: %s", lastSequence)
- tx, err := getDB().Begin()
+ tx, err := dbMan.getDB().Begin()
if err != nil {
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
@@ -454,10 +450,9 @@
return err
}
-func getApidInstanceInfo() (info apidInstanceInfo, err error) {
+func (dbMan *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
info.InstanceName = config.GetString(configName)
info.ClusterID = config.GetString(configApidClusterId)
-
var savedClusterId string
// always use default database for this
@@ -481,7 +476,7 @@
} else {
// first start - no row, generate a UUID and store it
err = nil
- newInstanceID = true
+ info.IsNewInstance = true
info.InstanceID = util.GenerateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
@@ -489,13 +484,14 @@
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
- log.Debug("Detected apid cluster id change in config. Apid will start clean")
+ log.Warnf("Detected apid cluster id change in config. %v v.s. %v Apid will start clean.",
+ savedClusterId, info.ClusterID)
err = nil
- newInstanceID = true
+ info.IsNewInstance = true
info.InstanceID = util.GenerateUUID()
- _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
- info.InstanceID, info.ClusterID, "")
+ _, err = tx.Exec("DELETE FROM APID;")
+
info.LastSnapshot = ""
}
if err = tx.Commit(); err != nil {
@@ -504,8 +500,8 @@
return
}
-func updateApidInstanceInfo() error {
-
+func (dbMan *dbManager) updateApidInstanceInfo(instanceId, clusterId, lastSnap string) error {
+ log.Debugf("updateApidInstanceInfo: %v, %v, %v", instanceId, clusterId, lastSnap)
// always use default database for this
db, err := dataService.DB()
if err != nil {
@@ -521,7 +517,7 @@
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
- apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+ instanceId, clusterId, lastSnap)
if err != nil {
log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
return err
@@ -536,3 +532,131 @@
return err
}
+
+func (dbMan *dbManager) extractTables() (map[string]bool, error) {
+ tables := make(map[string]bool)
+ db := dbMan.getDB()
+ rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
+ if err != nil {
+ return nil, err
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ var table sql.NullString
+ if err := rows.Scan(&table); err != nil {
+ return nil, err
+ }
+ log.Debugf("Table %v found in existing db", table)
+ if table.Valid {
+ tables[table.String] = true
+ }
+ }
+ log.Debugf("Extracting table names from existing DB %v", tables)
+ return tables, nil
+}
+
+func (dbMan *dbManager) getKnowTables() map[string]bool {
+ return dbMan.knownTables
+}
+
+func (dbMan *dbManager) processChangeList(changes *common.ChangeList) error {
+
+ tx, err := dbMan.getDB().Begin()
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
+
+ for _, change := range changes.Changes {
+ if change.Table == LISTENER_TABLE_APID_CLUSTER {
+ return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+ }
+ switch change.Operation {
+ case common.Insert:
+ err = dbMan.insert(change.Table, []common.Row{change.NewRow}, tx)
+ case common.Update:
+ if change.Table == LISTENER_TABLE_DATA_SCOPE {
+ return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+ }
+ err = dbMan.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+ case common.Delete:
+ err = dbMan.delete(change.Table, []common.Row{change.OldRow}, tx)
+ }
+ if err != nil {
+ return err
+ }
+
+ }
+
+ if err = tx.Commit(); err != nil {
+ return fmt.Errorf("Commit error in processChangeList: %v", err)
+ }
+ return nil
+}
+
+func (dbMan *dbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+
+ var prevDb string
+ if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
+ log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
+ apidInfo.LastSnapshot, snapshot.SnapshotInfo)
+ prevDb = apidInfo.LastSnapshot
+ } else {
+ log.Debugf("Process snapshot for version {%s}",
+ snapshot.SnapshotInfo)
+ }
+ db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ return fmt.Errorf("unable to access database: %v", err)
+ }
+
+ var numApidClusters int
+ tx, err := db.Begin()
+ if err != nil {
+ return fmt.Errorf("unable to open DB txn: {%v}", err.Error())
+ }
+ defer tx.Rollback()
+ err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
+ if err != nil {
+ return fmt.Errorf("unable to read database: {%s}", err.Error())
+ }
+
+ if numApidClusters != 1 {
+ return fmt.Errorf("illegal state for apid_cluster, must be a single row")
+ }
+
+ _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+ if err != nil && err.Error() != "duplicate column name: last_sequence" {
+ return fmt.Errorf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
+ }
+
+ if err = tx.Commit(); err != nil {
+ return fmt.Errorf("error when commit in processSqliteSnapshot: %v", err)
+ }
+
+ //update apid instance info
+ apidInfo.LastSnapshot = snapshot.SnapshotInfo
+ err = dbMan.updateApidInstanceInfo(apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+ if err != nil {
+ log.Errorf("Unable to update instance info: %v", err)
+ return fmt.Errorf("unable to update instance info: %v", err)
+ }
+
+ dbMan.setDB(db)
+ if isDataSnapshot {
+ dbMan.knownTables, err = dbMan.extractTables()
+ if err != nil {
+ return fmt.Errorf("unable to extract tables: %v", err)
+ }
+ }
+ log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+
+ // Releases the DB, when the Connection reference count reaches 0.
+ if prevDb != "" {
+ dataService.ReleaseDB(prevDb)
+ }
+ return nil
+}
diff --git a/data_test.go b/data_test.go
index 691e887..abad465 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,32 +15,54 @@
package apidApigeeSync
import (
+ "database/sql"
"github.com/apid/apid-core"
- "github.com/apid/apid-core/data"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "io/ioutil"
"sort"
"strconv"
)
var _ = Describe("data access tests", func() {
- testCount := 1
-
- var _ = BeforeEach(func() {
+ testCount := 0
+ var testDbMan *dbManager
+ var dbVersion string
+ BeforeEach(func() {
+ var testDir string
+ testDir, err := ioutil.TempDir(tmpDir, "data_test")
+ config.Set(configLocalStoragePath, testDir)
+ Expect(err).NotTo(HaveOccurred())
+ testDbMan = creatDbManager()
testCount++
- db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount))
+ dbVersion = "data_test_" + strconv.Itoa(testCount)
+ db, err := dataService.DBVersion(dbVersion)
Expect(err).Should(Succeed())
- initDB(db)
- createBootstrapTables(db)
- setDB(db)
+ testDbMan.setDB(db)
})
- var _ = AfterEach(func() {
- data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount)))
+ AfterEach(func() {
+ config.Set(configLocalStoragePath, tmpDir)
})
- Context("Update processing", func() {
+ It("check scope changes", func() {
+ newScopes := []string{"foo"}
+ scopes := []string{"bar"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo", "bar"}
+ scopes = []string{"bar"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo"}
+ scopes = []string{"bar", "foo"}
+ Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+ newScopes = []string{"foo", "bar"}
+ scopes = []string{"bar", "foo"}
+ Expect(scopeChanged(newScopes, scopes)).To(BeNil())
+
+ })
+
+ Context("build Sql", func() {
It("unit test buildUpdateSql with single primary key", func() {
testRow := common.Row{
"id": {
@@ -66,7 +88,7 @@
}
sort.Strings(orderedColumns)
- result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
+ result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" +
" WHERE id=$6").To(Equal(result))
})
@@ -99,403 +121,11 @@
}
sort.Strings(orderedColumns)
- result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
+ result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" +
" WHERE id1=$7 AND id2=$8").To(Equal(result))
})
- It("test update with composite primary key", func() {
- event := &common.ChangeList{}
-
- //this needs to match what is actually in the DB
- oldRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "A product for testing Greg",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- newRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "new description",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: oldRow,
- Operation: 1,
- },
- }
- //insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //create update event
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- //do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- })
-
- It("update should succeed if newrow modifies the primary key", func() {
- event := &common.ChangeList{}
-
- //this needs to match what is actually in the DB
- oldRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "A product for testing Greg",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- newRow := common.Row{
- "id": {
- Value: "new_id",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "new description",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: oldRow,
- Operation: 1,
- },
- }
- //insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //create update event
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- //do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
- })
-
- It("update should succeed if newrow contains fewer fields than oldrow", func() {
- event := &common.ChangeList{}
-
- oldRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "A product for testing Greg",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- newRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "new description",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: oldRow,
- Operation: 1,
- },
- }
- //insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //create update event
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- //do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
- })
-
- It("update should succeed if oldrow contains fewer fields than newrow", func() {
- event := &common.ChangeList{}
-
- oldRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "A product for testing Greg",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- newRow := common.Row{
- "id": {
- Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
- },
- "api_resources": {
- Value: "{/**}",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "43aef41d",
- },
- "description": {
- Value: "new description",
- },
- "created_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "updated_at": {
- Value: "2017-03-01 22:50:41.75+00:00",
- },
- "_change_selector": {
- Value: "43aef41d",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: oldRow,
- Operation: 1,
- },
- }
- //insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //create update event
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: oldRow,
- NewRow: newRow,
- Operation: 2,
- },
- }
-
- //do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
- })
- })
-
- Context("Insert processing", func() {
It("Properly constructs insert sql for one row", func() {
newRow := common.Row{
"id": {
@@ -531,7 +161,7 @@
sort.Strings(orderedColumns)
expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)"
- Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
+ Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
})
It("Properly constructs insert sql for multiple rows", func() {
@@ -595,275 +225,11 @@
sort.Strings(orderedColumns)
expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)"
- Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
+ Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
})
- It("Properly executes insert for a single rows", func() {
- event := &common.ChangeList{}
-
- newRow1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: newRow1,
- Operation: 1,
- },
- }
-
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- })
-
- It("Properly executed insert for multiple rows", func() {
- event := &common.ChangeList{}
-
- newRow1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
- newRow2 := common.Row{
- "id": {
- Value: "b",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: newRow1,
- Operation: 1,
- },
- {
- Table: "kms.api_product",
- NewRow: newRow2,
- Operation: 1,
- },
- }
-
- Expect(true).To(Equal(processChangeList(event)))
- var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(2))
- })
-
- It("Fails to execute if row does not match existing table schema", func() {
- event := &common.ChangeList{}
-
- newRow1 := common.Row{
- "not_and_id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: newRow1,
- Operation: 1,
- },
- }
-
- ok := processChangeList(event)
- Expect(false).To(Equal(ok))
-
- var nRows int
- //assert that no extraneous rows were added
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(0))
- })
-
- It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
- event := &common.ChangeList{}
- newRow1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- newRow2 := common.Row{
- "not_and_id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- event.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: newRow1,
- Operation: 1,
- },
- {
- Table: "kms.api_product",
- NewRow: newRow2,
- Operation: 1,
- },
- }
-
- ok := processChangeList(event)
- Expect(false).To(Equal(ok))
- })
- })
-
- Context("Delete processing", func() {
It("Properly constructs sql prepare for Delete", func() {
+ createBootstrapTables(testDbMan.getDB())
row := common.Row{
"id": {
Value: "new_id",
@@ -891,324 +257,1233 @@
},
}
- pkeys, err := getPkeysForTable("kms_api_product")
+ pkeys, err := testDbMan.getPkeysForTable("kms_api_product")
Expect(err).Should(Succeed())
- sql := buildDeleteSql("kms_api_product", row, pkeys)
+ sql := testDbMan.buildDeleteSql("kms_api_product", row, pkeys)
Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
})
+ })
- It("Verify execute single insert & single delete works", func() {
- event1 := &common.ChangeList{}
- event2 := &common.ChangeList{}
-
- Row1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
-
- event1.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: Row1,
- Operation: 1,
- },
- }
- event2.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: Row1,
- Operation: 3,
- },
- }
-
- Expect(true).To(Equal(processChangeList(event1)))
- var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
-
- Expect(true).To(Equal(processChangeList(event2)))
-
- // validate delete
- err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(0))
-
- // delete again should fail - coz entry will not exist
- Expect(false).To(Equal(processChangeList(event2)))
+ Context("Process Changelist", func() {
+ BeforeEach(func() {
+ createBootstrapTables(testDbMan.getDB())
})
- It("verify multiple insert and single delete works", func() {
- event1 := &common.ChangeList{}
- event2 := &common.ChangeList{}
+ Context("Update processing", func() {
+ It("test update with composite primary key", func() {
+ event := &common.ChangeList{}
- Row1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
+ //this needs to match what is actually in the DB
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- Row2 := common.Row{
- "id": {
- Value: "b",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- event1.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: Row1,
- Operation: 1,
- },
- {
- Table: "kms.api_product",
- NewRow: Row2,
- Operation: 1,
- },
- }
- event2.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: Row1,
- Operation: 3,
- },
- }
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
- Expect(true).To(Equal(processChangeList(event1)))
- var nRows int
- //verify first row
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
- //verify second row
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ //do the update
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(2))
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
- Expect(true).To(Equal(processChangeList(event2)))
+ })
- //verify second row still exists
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ It("update should succeed if newrow modifies the primary key", func() {
+ event := &common.ChangeList{}
- // validate delete
- err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ //this needs to match what is actually in the DB
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- // delete again should fail - coz entry will not exist
- Expect(false).To(Equal(processChangeList(event2)))
- }, 3)
+ newRow := common.Row{
+ "id": {
+ Value: "new_id",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- It("verify single insert and multiple delete fails", func() {
- event1 := &common.ChangeList{}
- event2 := &common.ChangeList{}
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
- Row1 := common.Row{
- "id": {
- Value: "a",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
- Row2 := common.Row{
- "id": {
- Value: "b",
- },
- "api_resources": {
- Value: "r",
- },
- "environments": {
- Value: "{test}",
- },
- "tenant_id": {
- Value: "t",
- },
- "description": {
- Value: "d",
- },
- "created_at": {
- Value: "c",
- },
- "updated_at": {
- Value: "u",
- },
- "_change_selector": {
- Value: "cs",
- },
- }
+ //do the update
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
- event1.Changes = []common.Change{
- {
- Table: "kms.api_product",
- NewRow: Row1,
- Operation: 1,
- },
- }
- event2.Changes = []common.Change{
- {
- Table: "kms.api_product",
- OldRow: Row1,
- Operation: 3,
- },
- {
- Table: "kms.api_product",
- OldRow: Row2,
- Operation: 3,
- },
- }
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
- Expect(true).To(Equal(processChangeList(event1)))
- var nRows int
- //verify insert
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
- "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
- "and _change_selector='cs'").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ It("update should succeed if newrow contains fewer fields than oldrow", func() {
+ event := &common.ChangeList{}
- //assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(1))
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- Expect(false).To(Equal(processChangeList(event2)))
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
- }, 3)
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
+
+ It("update should succeed if oldrow contains fewer fields than newrow", func() {
+ event := &common.ChangeList{}
+
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
+ })
+
+ Context("Insert processing", func() {
+ It("Properly executes insert for a single rows", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ })
+
+ It("Properly executed insert for multiple rows", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+ newRow2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: newRow2,
+ Operation: 1,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(2))
+ })
+
+ It("Fails to execute if row does not match existing table schema", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "not_and_id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+
+ var nRows int
+ //assert that no extraneous rows were added
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(0))
+ })
+
+ It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
+ event := &common.ChangeList{}
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ newRow2 := common.Row{
+ "not_and_id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: newRow2,
+ Operation: 1,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+ })
+ })
+
+ Context("Delete processing", func() {
+
+ It("Verify execute single insert & single delete works", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+ // validate delete
+ err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(0))
+
+ // delete again should fail - coz entry will not exist
+ Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+ })
+
+ It("verify multiple insert and single delete works", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ Row2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: Row2,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+ var nRows int
+ //verify first row
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //verify second row
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(2))
+
+ Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+ //verify second row still exists
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ // validate delete
+ err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ // delete again should fail - coz entry will not exist
+ Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+ }, 3)
+
+ It("verify single insert and multiple delete fails", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ Row2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ {
+ Table: "kms.api_product",
+ OldRow: Row2,
+ Operation: 3,
+ },
+ }
+
+ Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+ var nRows int
+ //verify insert
+ err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+
+ }, 3)
+ })
+
+ Context("ApigeeSync change event", func() {
+
+ Context(LISTENER_TABLE_APID_CLUSTER, func() {
+
+ It("should not change LISTENER_TABLE_APID_CLUSTER", func() {
+ event := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_APID_CLUSTER,
+ },
+ },
+ }
+ Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+
+ event = common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Update,
+ Table: LISTENER_TABLE_APID_CLUSTER,
+ },
+ },
+ }
+
+ Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+ })
+
+ })
+
+ Context("data scopes", func() {
+
+ It("insert event should add", func() {
+ event := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
+ },
+ },
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "j"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s2"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
+ },
+ },
+ },
+ }
+
+ testDbMan.processChangeList(&event)
+
+ count := 0
+ id := sql.NullString{}
+ rows, err := testDbMan.getDB().Query(`
+ SELECT scope FROM EDGEX_DATA_SCOPE`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+ for rows.Next() {
+ count++
+ Expect(rows.Scan(&id)).Should(Succeed())
+ Expect(id.String).Should(Equal("s" + strconv.Itoa(count)))
+ }
+
+ Expect(count).To(Equal(2))
+ })
+
+ It("delete event should delete", func() {
+ event := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
+ },
+ },
+ },
+ }
+
+ testDbMan.processChangeList(&event)
+
+ event = common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Delete,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ OldRow: event.Changes[0].NewRow,
+ },
+ },
+ }
+
+ testDbMan.processChangeList(&event)
+
+ var nRows int
+ err := testDbMan.getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(BeZero())
+ })
+
+ It("update event should panic for data scopes table", func() {
+ event := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Update,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ },
+ },
+ }
+
+ Expect(testDbMan.processChangeList(&event)).ToNot(Succeed())
+ })
+
+ })
+ })
+
})
+
+ Context("Process Snapshot", func() {
+ initTestDb := func(sqlFile string, dbMan *dbManager) common.Snapshot {
+ stmts, err := ioutil.ReadFile(sqlFile)
+ Expect(err).Should(Succeed())
+ Expect(testDbMan.getDB().Exec(string(stmts))).ShouldNot(BeNil())
+ Expect(testDbMan.initDB()).Should(Succeed())
+ return common.Snapshot{
+ SnapshotInfo: dbVersion,
+ }
+ }
+
+ AfterEach(func() {
+
+ })
+
+ It("should fail if more than one apid_cluster rows", func() {
+ event := initTestDb("./sql/init_listener_test_duplicate_apids.sql", testDbMan)
+ Expect(testDbMan.processSnapshot(&event, true)).ToNot(Succeed())
+ })
+
+ It("should process a valid Snapshot", func() {
+ config.Set(configApidClusterId, "a")
+ apidInfo.ClusterID = "a"
+ event := initTestDb("./sql/init_listener_test_valid_snapshot.sql", testDbMan)
+ Expect(testDbMan.processSnapshot(&event, true)).Should(Succeed())
+
+ info, err := testDbMan.getApidInstanceInfo()
+ Expect(err).Should(Succeed())
+ Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
+ Expect(info.IsNewInstance).To(BeFalse())
+ Expect(dataService.DBVersion(event.SnapshotInfo)).Should(Equal(testDbMan.getDB()))
+
+ // apid Cluster
+ id := &sql.NullString{}
+ Expect(testDbMan.getDB().QueryRow(`SELECT id FROM EDGEX_APID_CLUSTER`).
+ Scan(id)).Should(Succeed())
+ Expect(id.Valid).Should(BeTrue())
+ Expect(id.String).To(Equal("i"))
+
+ // Data Scope
+ env := &sql.NullString{}
+ count := 0
+ rows, err := testDbMan.getDB().Query(`SELECT env FROM EDGEX_DATA_SCOPE`)
+ Expect(err).Should(Succeed())
+ defer rows.Close()
+ for rows.Next() {
+ count++
+ rows.Scan(&env)
+ Expect(env.Valid).Should(BeTrue())
+ Expect(env.String).To(Equal("e" + strconv.Itoa(count)))
+ }
+ Expect(count).To(Equal(3))
+
+ //find scopes for Id
+ scopes, err := testDbMan.findScopesForId("a")
+ Expect(err).Should(Succeed())
+ Expect(len(scopes)).To(Equal(6))
+ expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
+ sort.Strings(scopes)
+ sort.Strings(expectedScopes)
+ Expect(scopes).Should(Equal(expectedScopes))
+ })
+
+ It("should detect clusterid change", func() {
+ Expect(testDbMan.initDB()).Should(Succeed())
+ testDbMan.updateApidInstanceInfo("a", "b", "c")
+ config.Set(configApidClusterId, "d")
+
+ info, err := testDbMan.getApidInstanceInfo()
+ Expect(err).Should(Succeed())
+ Expect(info.LastSnapshot).To(BeZero())
+ Expect(info.IsNewInstance).To(BeTrue())
+ })
+ })
+
})
func createBootstrapTables(db apid.DB) {
tx, err := db.Begin()
Expect(err).To(Succeed())
//all tests in this file operate on the api_product table. Create the necessary tables for this here
- tx.Exec("CREATE TABLE _transicator_tables " +
- "(tableName varchar not null, columnName varchar not null, " +
- "typid integer, primaryKey bool);")
- tx.Exec("DELETE from _transicator_tables")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
- tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
- tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
- "api_resources text,approval_type text,scopes text,proxies text, environments text," +
- "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
- "primary key (id,tenant_id,created_at,updated_at));")
- tx.Exec("DELETE from kms_api_product")
- err = tx.Commit()
+ _, err = tx.Exec(`CREATE TABLE _transicator_tables
+ (tableName varchar not null, columnName varchar not null, typid integer, primaryKey bool);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0);
+ CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at));
+ `)
Expect(err).To(Succeed())
+ _, err = tx.Exec(`
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org_scope',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env_scope',1043,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+ INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+ CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,org_scope text,
+ env_scope text,created blob,created_by text,updated blob,updated_by text,_change_selector text,
+ primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+ `)
+ Expect(err).To(Succeed())
+
+ Expect(tx.Commit()).To(Succeed())
}
diff --git a/dockertests/apid_config.yaml b/dockertests/apid_config.yaml
new file mode 100644
index 0000000..e4cd751
--- /dev/null
+++ b/dockertests/apid_config.yaml
@@ -0,0 +1,9 @@
+apigeesync_instance_name: SQLLITAPID
+apigeesync_snapshot_server_base: http://localhost:9001/
+apigeesync_change_server_base: http://localhost:9000/
+apigeesync_snapshot_proto: sqlite
+log_level: Debug
+apigeesync_consumer_key: 33f39JNLosF1mDOXJoCfbauchVzPrGrl
+apigeesync_consumer_secret: LAolGShAx6H3vfNF
+apigeesync_cluster_id: 4c6bb536-0d64-43ca-abae-17c08f1a7e58
+local_storage_path: /Users/haoming/go/src/github.com/apid/apidApigeeSync/tmp/sqlite
diff --git a/dockertests/dockerSetup.sh b/dockertests/dockerSetup.sh
index 8df2b5d..6ded35d 100755
--- a/dockertests/dockerSetup.sh
+++ b/dockertests/dockerSetup.sh
@@ -25,7 +25,6 @@
TEST_PG_BASE=postgres://postgres:changeme@$DOCKER_IP:5432
TEST_PG_URL=postgres://postgres:changeme@$DOCKER_IP:5432/edgex
echo ${TEST_PG_URL}
-
export APIGEE_SYNC_DOCKER_PG_URL=${TEST_PG_URL}
export APIGEE_SYNC_DOCKER_IP=${DOCKER_IP}
@@ -47,8 +46,13 @@
ssname=apidSync_test_ss
csname=apidSync_test_cs
+# setup docker network
+docker network rm apidApigeeSync-docker-test || true
+docker network create apidApigeeSync-docker-test
+DOCKER_PG_URL=postgres://postgres:changeme@$pgname:5432/edgex
+echo $DOCKER_PG_URL
# run PG
-docker run --name ${pgname} -p 5432:5432 -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres
+docker run --name ${pgname} -p 5432:5432 --network=apidApigeeSync-docker-test -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres
# Wait for PG to be up -- it takes a few seconds
while `true`
@@ -67,8 +71,8 @@
psql -f ${WORK_DIR}/dockertests/user-setup.sql ${TEST_PG_URL}
# run SS and CS
-docker run --name ${ssname} -d -p 9001:9001 apigeelabs/transicator-snapshot -p 9001 -u ${TEST_PG_URL}
-docker run --name ${csname} -d -p 9000:9000 apigeelabs/transicator-changeserver -p 9000 -u ${TEST_PG_URL} -s testslot
+docker run --name ${ssname} -d -p 9001:9001 --network=apidApigeeSync-docker-test apigeelabs/transicator-snapshot -p 9001 -u ${DOCKER_PG_URL}
+docker run --name ${csname} -d -p 9000:9000 --network=apidApigeeSync-docker-test apigeelabs/transicator-changeserver -p 9000 -u ${DOCKER_PG_URL} -s testslot
# Wait for SS to be up
while `true`
diff --git a/dockertests/docker_test.go b/dockertests/docker_test.go
index e8fc25d..631e17a 100644
--- a/dockertests/docker_test.go
+++ b/dockertests/docker_test.go
@@ -18,6 +18,7 @@
"encoding/json"
"github.com/apid/apid-core"
"github.com/apid/apid-core/factory"
+ "github.com/apid/apidApigeeSync"
_ "github.com/apid/apidApigeeSync"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -77,7 +78,7 @@
config.Set(configProxyServerBaseURI, testServer.URL)
// init plugin
- apid.RegisterPlugin(initPlugin)
+ apid.RegisterPlugin(initPlugin, apidApigeeSync.PluginData)
apid.InitializePlugins("dockerTest")
<-initDone
@@ -263,6 +264,8 @@
updated: t,
updatedBy: testInitUser,
changeSelector: clusterId,
+ orgScope: "org1",
+ envScope: "env1",
}
bf := bundleConfigData{
@@ -335,7 +338,7 @@
type newTableHandler struct {
targetTablename string
done Done
- verifyFunc func (string, apid.DB)
+ verifyFunc func(string, apid.DB)
}
func (n *newTableHandler) Handle(event apid.Event) {
diff --git a/dockertests/management_pg.go b/dockertests/management_pg.go
index bbe28e9..77bdf67 100644
--- a/dockertests/management_pg.go
+++ b/dockertests/management_pg.go
@@ -90,9 +90,11 @@
created_by,
updated,
updated_by,
- _change_selector
+ _change_selector,
+ org_scope,
+ env_scope
)
- VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)`)
+ VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`)
if err != nil {
return err
}
@@ -108,6 +110,8 @@
ds.updated,
ds.updatedBy,
ds.changeSelector,
+ ds.orgScope,
+ ds.envScope,
)
return err
diff --git a/dockertests/master-schema.sql b/dockertests/master-schema.sql
index 756ff62..00f60d0 100644
--- a/dockertests/master-schema.sql
+++ b/dockertests/master-schema.sql
@@ -45,6 +45,8 @@
updated timestamp without time zone,
updated_by text,
_change_selector text,
+ org_scope character varying(36) NOT NULL,
+ env_scope character varying(36) NOT NULL,
CONSTRAINT data_scope_pkey PRIMARY KEY (id),
CONSTRAINT data_scope_apid_cluster_id_fk FOREIGN KEY (apid_cluster_id)
REFERENCES apid_cluster (id)
diff --git a/dockertests/pg_table_data.go b/dockertests/pg_table_data.go
index 0a1aecd..854e587 100644
--- a/dockertests/pg_table_data.go
+++ b/dockertests/pg_table_data.go
@@ -42,6 +42,8 @@
updated time.Time
updatedBy string
changeSelector string
+ orgScope string
+ envScope string
}
/* FOREIGN KEY (data_scope_id)
diff --git a/init.go b/init.go
index b7f470e..f381ca4 100644
--- a/init.go
+++ b/init.go
@@ -15,7 +15,6 @@
package apidApigeeSync
import (
- "encoding/json"
"fmt"
"net/http"
"os"
@@ -39,7 +38,8 @@
// special value - set by ApigeeSync, not taken from configuration
configApidInstanceID = "apigeesync_apid_instance_id"
// This will not be needed once we have plugin handling tokens.
- configBearerToken = "apigeesync_bearer_token"
+ configBearerToken = "apigeesync_bearer_token"
+ configLocalStoragePath = "local_storage_path"
)
const (
@@ -48,17 +48,12 @@
var (
/* All set during plugin initialization */
- log apid.LogService
- config apid.ConfigService
- dataService apid.DataService
- events apid.EventsService
- apidInfo apidInstanceInfo
- newInstanceID bool
- apidTokenManager tokenManager
- apidChangeManager changeManager
- apidSnapshotManager snapShotManager
- httpclient *http.Client
- isOfflineMode bool
+ log apid.LogService
+ config apid.ConfigService
+ dataService apid.DataService
+ eventService apid.EventsService
+ apiService apid.APIService
+ apidInfo apidInstanceInfo
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -68,6 +63,7 @@
type apidInstanceInfo struct {
InstanceID, InstanceName, ClusterID, LastSnapshot string
+ IsNewInstance bool
}
type pluginDetail struct {
@@ -76,7 +72,7 @@
}
func init() {
- apid.RegisterPlugin(initPlugin, pluginData)
+ apid.RegisterPlugin(initPlugin, PluginData)
}
func initConfigDefaults() {
@@ -93,59 +89,7 @@
log.Debugf("Using %s as display name", config.GetString(configName))
}
-func initVariables() error {
-
- var tr *http.Transport
-
- tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
- tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
- httpclient = &http.Client{
- Transport: tr,
- Timeout: httpTimeout,
- CheckRedirect: func(req *http.Request, _ []*http.Request) error {
- req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
- return nil
- },
- }
-
- // set up default database
- db, err := dataService.DB()
- if err != nil {
- return fmt.Errorf("Unable to access DB: %v", err)
- }
- err = initDB(db)
- if err != nil {
- return fmt.Errorf("Unable to access DB: %v", err)
- }
- setDB(db)
-
- apidInfo, err = getApidInstanceInfo()
- if err != nil {
- return fmt.Errorf("Unable to get apid instance info: %v", err)
- }
-
- if config.IsSet(configApidInstanceID) {
- log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
- }
- config.Set(configApidInstanceID, apidInfo.InstanceID)
-
- return nil
-}
-
-func createManagers() {
- if isOfflineMode {
- apidSnapshotManager = &offlineSnapshotManager{}
- apidChangeManager = &offlineChangeManager{}
- } else {
- apidSnapshotManager = createSnapShotManager()
- apidChangeManager = createChangeManager()
- }
-
- apidTokenManager = createSimpleTokenManager()
-}
-
-func checkForRequiredValues() error {
+func checkForRequiredValues(isOfflineMode bool) error {
required := []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret}
if !isOfflineMode {
required = append(required, configSnapServerBaseURI, configChangeServerBaseURI)
@@ -153,12 +97,12 @@
// check for required values
for _, key := range required {
if !config.IsSet(key) {
- return fmt.Errorf("Missing required config value: %s", key)
+ return fmt.Errorf("missing required config value: %s", key)
}
}
proto := config.GetString(configSnapshotProtocol)
if proto != "sqlite" {
- return fmt.Errorf("Illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
+ return fmt.Errorf("illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
}
return nil
@@ -168,90 +112,98 @@
log = logger
}
-/* initialization */
-func _initPlugin(services apid.Services) error {
- log.Debug("start init")
+func initManagers(isOfflineMode bool) (*listenerManager, *ApiManager, error) {
+ // check for forward proxy
+ var tr *http.Transport
+ tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
+ tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
- config = services.Config()
- initConfigDefaults()
-
- if config.GetBool(configDiagnosticMode) {
- log.Warn("Diagnostic mode: will not download changelist and snapshots!")
- isOfflineMode = true
- }
-
- err := checkForRequiredValues()
+ apidDbManager := creatDbManager()
+ db, err := dataService.DB()
if err != nil {
- return err
+ return nil, nil, fmt.Errorf("unable to access DB: %v", err)
}
-
- err = initVariables()
+ apidDbManager.setDB(db)
+ err = apidDbManager.initDB()
if err != nil {
- return err
+ return nil, nil, fmt.Errorf("unable to access DB: %v", err)
}
- return nil
+ apidInfo, err = apidDbManager.getApidInstanceInfo()
+ if err != nil {
+ return nil, nil, fmt.Errorf("unable to get apid instance info: %v", err)
+ }
+
+ if config.IsSet(configApidInstanceID) {
+ log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
+ }
+ config.Set(configApidInstanceID, apidInfo.InstanceID)
+
+ apidTokenManager := createApidTokenManager(apidInfo.IsNewInstance)
+ var snapMan snapshotManager
+ var apidChangeManager changeManager
+
+ if isOfflineMode {
+ snapMan = &offlineSnapshotManager{
+ dbMan: apidDbManager,
+ }
+ apidChangeManager = &offlineChangeManager{}
+ } else {
+ httpClient := &http.Client{
+ Transport: tr,
+ Timeout: httpTimeout,
+ CheckRedirect: func(req *http.Request, _ []*http.Request) error {
+ req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
+ return nil
+ },
+ }
+ snapMan = createSnapShotManager(apidDbManager, apidTokenManager, httpClient)
+ apidChangeManager = createChangeManager(apidDbManager, snapMan, apidTokenManager, httpClient)
+ }
+
+ listenerMan := &listenerManager{
+ changeMan: apidChangeManager,
+ snapMan: snapMan,
+ tokenMan: apidTokenManager,
+ isOfflineMode: isOfflineMode,
+ }
+
+ apiMan := &ApiManager{
+ endpoint: tokenEndpoint,
+ tokenMan: apidTokenManager,
+ }
+ return listenerMan, apiMan, nil
}
func initPlugin(services apid.Services) (apid.PluginData, error) {
SetLogger(services.Log().ForModule("apigeeSync"))
dataService = services.Data()
- events = services.Events()
+ eventService = services.Events()
+ apiService = services.API()
+ log.Debug("start init")
+ config = services.Config()
+ initConfigDefaults()
- err := _initPlugin(services)
+ isOfflineMode := false
+ if config.GetBool(configDiagnosticMode) {
+ log.Warn("Diagnostic mode: will not download changelist and snapshots!")
+ isOfflineMode = true
+ }
+
+ err := checkForRequiredValues(isOfflineMode)
if err != nil {
- return pluginData, err
+ return PluginData, err
}
+ if err != nil {
+ return PluginData, err
+ }
+ listenerMan, apiMan, err := initManagers(isOfflineMode)
+ if err != nil {
+ return PluginData, err
+ }
+ listenerMan.init()
+ apiMan.InitAPI(apiService)
- createManagers()
-
- /* This callback function will get called once all the plugins are
- * initialized (not just this plugin). This is needed because,
- * downloadSnapshots/changes etc have to begin to be processed only
- * after all the plugins are initialized
- */
- events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
-
- InitAPI(services)
log.Debug("end init")
-
- return pluginData, nil
-}
-
-// Plugins have all initialized, gather their info and start the ApigeeSync downloads
-func postInitPlugins(event apid.Event) {
- var plinfoDetails []pluginDetail
- if pie, ok := event.(apid.PluginsInitializedEvent); ok {
- /*
- * Store the plugin details in the heap. Needed during
- * Bearer token generation request.
- */
- for _, plugin := range pie.Plugins {
- name := plugin.Name
- version := plugin.Version
- if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
- inf := pluginDetail{
- Name: name,
- SchemaVersion: schemaVersion}
- plinfoDetails = append(plinfoDetails, inf)
- log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
- }
- }
- if plinfoDetails == nil {
- log.Panicf("No Plugins registered!")
- }
-
- pgInfo, err := json.Marshal(plinfoDetails)
- if err != nil {
- log.Panicf("Unable to marshal plugin data: %v", err)
- }
- apidPluginDetails = string(pgInfo[:])
-
- log.Debug("start post plugin init")
-
- apidTokenManager.start()
- go bootstrap()
-
- log.Debug("Done post plugin init")
- }
+ return PluginData, nil
}
diff --git a/init_test.go b/init_test.go
index 03e5450..bdd9fc7 100644
--- a/init_test.go
+++ b/init_test.go
@@ -18,36 +18,78 @@
"github.com/apid/apid-core"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "net/http"
+ "strconv"
)
var _ = Describe("init", func() {
- var _ = BeforeEach(func() {
- _initPlugin(apid.AllServices())
+ testCount := 0
+ BeforeEach(func() {
+ testCount++
})
Context("Apid Instance display name", func() {
+ AfterEach(func() {
+ apiService = apid.API()
+ })
- It("should be hostname by default", func() {
- log.Info("Starting init tests...")
+ It("init should register listener", func() {
+ me := &mockEvent{
+ listenerMap: make(map[apid.EventSelector]apid.EventHandlerFunc),
+ }
+ ma := &mockApi{
+ handleMap: make(map[string]http.HandlerFunc),
+ }
+ ms := &mockService{
+ config: apid.Config(),
+ log: apid.Log(),
+ api: ma,
+ data: apid.Data(),
+ events: me,
+ }
+ testname := "test_" + strconv.Itoa(testCount)
+ ms.config.Set(configName, testname)
+ pd, err := initPlugin(ms)
+ Expect(err).Should(Succeed())
+ Expect(apidInfo.InstanceName).To(Equal(testname))
+ Expect(me.listenerMap[apid.SystemEventsSelector]).ToNot(BeNil())
+ Expect(ma.handleMap[tokenEndpoint]).ToNot(BeNil())
+ Expect(pd).Should(Equal(PluginData))
+ Expect(apidInfo.IsNewInstance).Should(BeTrue())
+ })
- initConfigDefaults()
- Expect(apidInfo.InstanceName).To(Equal("testhost"))
- }, 3)
+ It("create managers for normal mode", func() {
+ listenerMan, apiMan, err := initManagers(false)
+ Expect(err).Should(Succeed())
+ Expect(listenerMan).ToNot(BeNil())
+ Expect(listenerMan.tokenMan).ToNot(BeNil())
+ snapMan, ok := listenerMan.snapMan.(*apidSnapshotManager)
+ Expect(ok).Should(BeTrue())
+ Expect(snapMan.tokenMan).ToNot(BeNil())
+ Expect(snapMan.dbMan).ToNot(BeNil())
+ changeMan, ok := listenerMan.changeMan.(*pollChangeManager)
+ Expect(ok).Should(BeTrue())
+ Expect(changeMan.tokenMan).ToNot(BeNil())
+ Expect(changeMan.dbMan).ToNot(BeNil())
+ Expect(changeMan.snapMan).ToNot(BeNil())
+ Expect(apiMan).ToNot(BeNil())
+ Expect(apiMan.tokenMan).ToNot(BeNil())
+ })
- It("accept display name from config", func() {
- config.Set(configName, "aa01")
- initConfigDefaults()
- var apidInfoLatest apidInstanceInfo
- apidInfoLatest, _ = getApidInstanceInfo()
- Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
- Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
- }, 3)
+ It("create managers for diagnostic mode", func() {
+ config.Set(configDiagnosticMode, true)
+ listenerMan, apiMan, err := initManagers(true)
+ Expect(err).Should(Succeed())
+ Expect(listenerMan).ToNot(BeNil())
+ Expect(listenerMan.tokenMan).ToNot(BeNil())
+ snapMan, ok := listenerMan.snapMan.(*offlineSnapshotManager)
+ Expect(ok).Should(BeTrue())
+ Expect(snapMan.dbMan).ToNot(BeNil())
+ _, ok = listenerMan.changeMan.(*offlineChangeManager)
+ Expect(ok).Should(BeTrue())
+ Expect(apiMan).ToNot(BeNil())
+ Expect(apiMan.tokenMan).ToNot(BeNil())
+ })
})
-
- It("should put apigeesync_apid_instance_id value in config", func() {
- instanceID := config.GetString(configApidInstanceID)
- Expect(instanceID).NotTo(BeEmpty())
- Expect(instanceID).To(Equal(apidInfo.InstanceID))
- })
})
diff --git a/listener.go b/listener.go
index 94befd6..7f46b34 100644
--- a/listener.go
+++ b/listener.go
@@ -15,9 +15,8 @@
package apidApigeeSync
import (
- "errors"
+ "encoding/json"
"github.com/apid/apid-core"
- "github.com/apigee-labs/transicator/common"
)
const (
@@ -25,107 +24,86 @@
LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
-func processSnapshot(snapshot *common.Snapshot) {
+type listenerManager struct {
+ changeMan changeManager
+ snapMan snapshotManager
+ tokenMan tokenManager
+ isOfflineMode bool
+}
- var prevDb string
- if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
- log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
- apidInfo.LastSnapshot, snapshot.SnapshotInfo)
- prevDb = apidInfo.LastSnapshot
- } else {
- log.Debugf("Process snapshot for version {%s}",
- snapshot.SnapshotInfo)
- }
- db, err := dataService.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+func (l *listenerManager) init() {
+ /* This callback function will get called once all the plugins are
+ * initialized (not just this plugin). This is needed because,
+ * downloadSnapshots/changes etc have to begin to be processed only
+ * after all the plugins are initialized
+ */
+ eventService.ListenOnceFunc(apid.SystemEventsSelector, l.postInitPlugins)
+}
- processSqliteSnapshot(db)
+// Plugins have all initialized, gather their info and start the ApigeeSync downloads
+func (l *listenerManager) postInitPlugins(event apid.Event) {
+ var plinfoDetails []pluginDetail
+ if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+ /*
+ * Store the plugin details in the heap. Needed during
+ * Bearer token generation request.
+ */
+ for _, plugin := range pie.Plugins {
+ name := plugin.Name
+ version := plugin.Version
+ if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
+ inf := pluginDetail{
+ Name: name,
+ SchemaVersion: schemaVersion}
+ plinfoDetails = append(plinfoDetails, inf)
+ log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
+ }
+ }
+ if plinfoDetails == nil {
+ log.Panic("No Plugins registered!")
+ }
- //update apid instance info
- apidInfo.LastSnapshot = snapshot.SnapshotInfo
- err = updateApidInstanceInfo()
- if err != nil {
- log.Panicf("Unable to update instance info: %v", err)
- }
+ pgInfo, err := json.Marshal(plinfoDetails)
+ if err != nil {
+ log.Panicf("Unable to marshal plugin data: %v", err)
+ }
+ apidPluginDetails = string(pgInfo[:])
- setDB(db)
- log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+ log.Debug("start post plugin init")
- // Releases the DB, when the Connection reference count reaches 0.
- if prevDb != "" {
- dataService.ReleaseDB(prevDb)
+ l.tokenMan.start()
+ go l.bootstrap(apidInfo.LastSnapshot)
+
+ log.Debug("Done post plugin init")
}
}
-func processSqliteSnapshot(db apid.DB) {
-
- var numApidClusters int
- tx, err := db.Begin()
- if err != nil {
- log.Panicf("Unable to open DB txn: {%v}", err.Error())
- }
- defer tx.Rollback()
- err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
- if err != nil {
- log.Panicf("Unable to read database: {%s}", err.Error())
+/*
+ * Start from existing snapshot if possible
+ * If an existing snapshot does not exist, use the apid scope to fetch
+ * all data scopes, then get a snapshot for those data scopes
+ *
+ * Then, poll for changes
+ */
+func (l *listenerManager) bootstrap(lastSnap string) {
+ if l.isOfflineMode && lastSnap == "" {
+ log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
}
- if numApidClusters != 1 {
- log.Panic("Illegal state for apid_cluster. Must be a single row.")
- }
-
- _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
- if err != nil {
- if err.Error() == "duplicate column name: last_sequence" {
+ if lastSnap != "" {
+ if err := l.snapMan.startOnDataSnapshot(lastSnap); err == nil {
+ log.Infof("Started on local snapshot: %s", lastSnap)
+ l.changeMan.pollChangeWithBackoff()
return
} else {
- log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
- }
- }
- if err = tx.Commit(); err != nil {
- log.Errorf("Error when commit in processSqliteSnapshot: %v", err)
- }
-}
-
-func processChangeList(changes *common.ChangeList) bool {
-
- ok := false
-
- tx, err := getDB().Begin()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- }
- defer tx.Rollback()
-
- log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
-
- for _, change := range changes.Changes {
- if change.Table == LISTENER_TABLE_APID_CLUSTER {
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- }
- switch change.Operation {
- case common.Insert:
- ok = insert(change.Table, []common.Row{change.NewRow}, tx)
- case common.Update:
- if change.Table == LISTENER_TABLE_DATA_SCOPE {
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- }
- ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
- case common.Delete:
- ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
- }
- if !ok {
- err = errors.New("Sql Operation error. Operation rollbacked")
- log.Error("Sql Operation error. Operation rollbacked")
- return false
+ log.Errorf("Failed to bootstrap on local snapshot: %v", err)
+ log.Warn("Will get new snapshots.")
}
}
- if err = tx.Commit(); err != nil {
- log.Errorf("Commit error in processChangeList: %v", err)
- return false
+ l.snapMan.downloadBootSnapshot()
+ if err := l.snapMan.downloadDataSnapshot(); err != nil {
+ log.Panicf("Error downloading data snapshot: %v", err)
}
- return true
+ l.changeMan.pollChangeWithBackoff()
}
diff --git a/listener_test.go b/listener_test.go
index 54974eb..5e8f072 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,360 +15,79 @@
package apidApigeeSync
import (
+ "github.com/apid/apid-core"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
-
- "github.com/apid/apid-core"
- "github.com/apigee-labs/transicator/common"
- "os"
- "reflect"
- "sort"
+ "strconv"
)
var _ = Describe("listener", func() {
+ testCount := 0
+ var testListenerMan *listenerManager
+ var dummyChangeMan *dummyChangeManager
+ var dummySnapMan *dummySnapshotManager
+ var dummyTokenMan *dummyTokenManager
- var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
- initDb(sqlfile, "./mockdb.sqlite3")
- file, err := os.Open("./mockdb.sqlite3")
- Expect(err).ShouldNot(HaveOccurred())
-
- s := common.Snapshot{}
- err = processSnapshotServerFileResponse(dbId, file, &s)
- Expect(err).ShouldNot(HaveOccurred())
- return s
- }
-
- var _ = BeforeEach(func() {
- _initPlugin(apid.AllServices())
- })
-
- var _ = AfterEach(func() {
- if wipeDBAferTest {
- db, err := dataService.DB()
- Expect(err).Should(Succeed())
- tx, err := db.Begin()
- Expect(err).Should(Succeed())
- _, err = tx.Exec("DELETE FROM APID")
- Expect(err).Should(Succeed())
- err = tx.Commit()
- Expect(err).Should(Succeed())
+ BeforeEach(func() {
+ testCount++
+ dummySnapMan = &dummySnapshotManager{
+ downloadCalledChan: make(chan bool, 1),
+ startCalledChan: make(chan bool, 1),
}
- wipeDBAferTest = true
+ dummyTokenMan = &dummyTokenManager{
+ invalidateChan: make(chan bool, 1),
+ }
+ dummyChangeMan = &dummyChangeManager{
+ pollChangeWithBackoffChan: make(chan bool, 1),
+ }
+ testListenerMan = &listenerManager{
+ changeMan: dummyChangeMan,
+ snapMan: dummySnapMan,
+ tokenMan: dummyTokenMan,
+ }
})
- Context("ApigeeSync snapshot event", func() {
+ AfterEach(func() {
- It("should fail if more than one apid_cluster rows", func() {
- event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
- Expect(func() { processSnapshot(&event) }).To(Panic())
- }, 3)
-
- It("should fail if more than one apid_cluster rows", func() {
- newScopes := []string{"foo"}
- scopes := []string{"bar"}
- Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
- newScopes = []string{"foo", "bar"}
- scopes = []string{"bar"}
- Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
- newScopes = []string{"foo"}
- scopes = []string{"bar", "foo"}
- Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
- newScopes = []string{"foo", "bar"}
- scopes = []string{"bar", "foo"}
- Expect(scopeChanged(newScopes, scopes)).To(BeNil())
-
- }, 3)
-
- It("should process a valid Snapshot", func() {
-
- event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
- processSnapshot(&event)
-
- info, err := getApidInstanceInfo()
- Expect(err).NotTo(HaveOccurred())
-
- Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
-
- db := getDB()
-
- expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
-
- Expect(db == expectedDB).Should(BeTrue())
-
- // apid Cluster
- var dcs []dataApidCluster
-
- rows, err := db.Query(`
- SELECT id, name, description, umbrella_org_app_name,
- created, created_by, updated, updated_by
- FROM EDGEX_APID_CLUSTER`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- c := dataApidCluster{}
- for rows.Next() {
- rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
- &c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
- dcs = append(dcs, c)
- }
-
- Expect(len(dcs)).To(Equal(1))
- dc := dcs[0]
-
- Expect(dc.ID).To(Equal("i"))
- Expect(dc.Name).To(Equal("n"))
- Expect(dc.Description).To(Equal("d"))
- Expect(dc.OrgAppName).To(Equal("o"))
- Expect(dc.Created).To(Equal("c"))
- Expect(dc.CreatedBy).To(Equal("c"))
- Expect(dc.Updated).To(Equal("u"))
- Expect(dc.UpdatedBy).To(Equal("u"))
-
- // Data Scope
- var dds []dataDataScope
-
- rows, err = db.Query(`
- SELECT id, apid_cluster_id, scope, org,
- env, created, created_by, updated,
- updated_by
- FROM EDGEX_DATA_SCOPE`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- d := dataDataScope{}
- for rows.Next() {
- rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
- &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
- &d.UpdatedBy)
- dds = append(dds, d)
- }
-
- Expect(len(dds)).To(Equal(3))
- ds := dds[0]
-
- Expect(ds.ID).To(Equal("i"))
- Expect(ds.Org).To(Equal("o"))
- Expect(ds.Env).To(Equal("e1"))
- Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
- Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
- Expect(ds.UpdatedBy).To(Equal("u"))
-
- ds = dds[1]
- Expect(ds.Env).To(Equal("e2"))
- Expect(ds.Scope).To(Equal("s1"))
- ds = dds[2]
- Expect(ds.Env).To(Equal("e3"))
- Expect(ds.Scope).To(Equal("s2"))
-
- scopes := findScopesForId("a")
- Expect(len(scopes)).To(Equal(6))
- expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
- sort.Strings(scopes)
- sort.Strings(expectedScopes)
- Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
- }, 3)
})
- Context("ApigeeSync change event", func() {
-
- Context(LISTENER_TABLE_APID_CLUSTER, func() {
-
- It("insert event should panic", func() {
- ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
- processSnapshot(&ssEvent)
-
- //save the last snapshot, so we can restore it at the end of this context
-
- csEvent := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_APID_CLUSTER,
- },
+ It("postInitPlugins, start cleanly", func() {
+ testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+ eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+ eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+ Description: "test",
+ Plugins: []apid.PluginData{
+ {
+ Name: "name",
+ Version: "0.0.1",
+ ExtraData: map[string]interface{}{
+ "schemaVersion": "0.0.1",
},
- }
-
- Expect(func() { processChangeList(&csEvent) }).To(Panic())
- }, 3)
-
- It("update event should panic", func() {
- ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
- processSnapshot(&ssEvent)
-
- event := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Update,
- Table: LISTENER_TABLE_APID_CLUSTER,
- },
- },
- }
-
- Expect(func() { processChangeList(&event) }).To(Panic())
- //restore the last snapshot
- }, 3)
-
+ },
+ },
})
-
- Context(LISTENER_TABLE_DATA_SCOPE, func() {
-
- It("insert event should add", func() {
- ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
- processSnapshot(&ssEvent)
-
- event := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "_change_selector": &common.ColumnVal{Value: "cs"},
- },
- },
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "_change_selector": &common.ColumnVal{Value: "cs"},
- },
- },
- },
- }
-
- processChangeList(&event)
-
- var dds []dataDataScope
-
- rows, err := getDB().Query(`
- SELECT id, apid_cluster_id, scope, org,
- env, created, created_by, updated,
- updated_by
- FROM EDGEX_DATA_SCOPE`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- d := dataDataScope{}
- for rows.Next() {
- rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
- &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
- &d.UpdatedBy)
- dds = append(dds, d)
- }
-
- //three already existing
- Expect(len(dds)).To(Equal(2))
- ds := dds[0]
-
- Expect(ds.ID).To(Equal("i"))
- Expect(ds.Org).To(Equal("o"))
- Expect(ds.Env).To(Equal("e"))
- Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
- Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
- Expect(ds.UpdatedBy).To(Equal("u"))
-
- ds = dds[1]
- Expect(ds.Scope).To(Equal("s2"))
-
- scopes := findScopesForId("a")
- Expect(len(scopes)).To(Equal(2))
- Expect(scopes[0]).To(Equal("s1"))
- Expect(scopes[1]).To(Equal("s2"))
-
- }, 3)
-
- It("delete event should delete", func() {
- ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
- processSnapshot(&ssEvent)
- insert := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "_change_selector": &common.ColumnVal{Value: "cs"},
- },
- },
- },
- }
-
- processChangeList(&insert)
-
- delete := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Delete,
- Table: LISTENER_TABLE_DATA_SCOPE,
- OldRow: insert.Changes[0].NewRow,
- },
- },
- }
-
- processChangeList(&delete)
-
- var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
- Expect(err).NotTo(HaveOccurred())
-
- Expect(0).To(Equal(nRows))
- }, 3)
-
- It("update event should panic for data scopes table", func() {
- ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
- processSnapshot(&ssEvent)
-
- event := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Update,
- Table: LISTENER_TABLE_DATA_SCOPE,
- },
- },
- }
-
- Expect(func() { processChangeList(&event) }).To(Panic())
- //restore the last snapshot
- }, 3)
-
- //TODO add tests for update/insert/delete cluster
- })
+ Expect(<-dummySnapMan.downloadCalledChan).Should(BeFalse())
+ Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
})
+
+ It("postInitPlugins, start from local db", func() {
+ apidInfo.LastSnapshot = "test_snapshot"
+ testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+ eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+ eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+ Description: "test",
+ Plugins: []apid.PluginData{
+ {
+ Name: "name",
+ Version: "0.0.1",
+ ExtraData: map[string]interface{}{
+ "schemaVersion": "0.0.1",
+ },
+ },
+ },
+ })
+ Expect(<-dummySnapMan.startCalledChan).Should(BeTrue())
+ Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
+ })
+
})
diff --git a/managerInterfaces.go b/managerInterfaces.go
index a93e950..978aede 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -15,31 +15,38 @@
package apidApigeeSync
import (
+ "github.com/apid/apid-core"
"github.com/apigee-labs/transicator/common"
- "net/url"
)
type tokenManager interface {
getBearerToken() string
- invalidateToken() error
- getToken() *OauthToken
+ invalidateToken()
close()
- getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
start()
getTokenReadyChannel() <-chan bool
}
-type snapShotManager interface {
+type snapshotManager interface {
close() <-chan bool
downloadBootSnapshot()
- storeBootSnapshot(snapshot *common.Snapshot)
- downloadDataSnapshot()
- storeDataSnapshot(snapshot *common.Snapshot)
- downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error
- startOnLocalSnapshot(snapshot string) *common.Snapshot
+ downloadDataSnapshot() error
+ startOnDataSnapshot(snapshot string) error
}
type changeManager interface {
close() <-chan bool
pollChangeWithBackoff()
}
+
+type DbManager interface {
+ initDB() error
+ setDB(db apid.DB)
+ getLastSequence() (lastSequence string)
+ findScopesForId(configId string) (scopes []string, err error)
+ updateLastSequence(lastSequence string) error
+ getApidInstanceInfo() (info apidInstanceInfo, err error)
+ processChangeList(changes *common.ChangeList) error
+ processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error
+ getKnowTables() map[string]bool
+}
diff --git a/mock_server.go b/mock_server_test.go
similarity index 94%
rename from mock_server.go
rename to mock_server_test.go
index f39adb7..e61b83e 100644
--- a/mock_server.go
+++ b/mock_server_test.go
@@ -90,14 +90,14 @@
changeChannel chan []byte
sequenceID *int64
maxDevID *int64
- deployIDMutex sync.RWMutex
+ deployIDMutex *sync.RWMutex
minDeploymentID *int64
maxDeploymentID *int64
newSnap *int32
authFail *int32
}
-func (m *MockServer) forceAuthFail() {
+func (m *MockServer) forceAuthFailOnce() {
atomic.StoreInt32(m.authFail, 1)
}
@@ -118,11 +118,13 @@
}
func (m *MockServer) lastSequenceID() string {
- return strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+ num := strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+ return num + "." + num + "." + num
}
func (m *MockServer) nextSequenceID() string {
- return strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+ num := strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+ return num + "." + num + "." + num
}
func (m *MockServer) nextDeveloperID() string {
@@ -180,7 +182,7 @@
m.newSnap = new(int32)
m.authFail = new(int32)
*m.authFail = 0
-
+ m.deployIDMutex = &sync.RWMutex{}
initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
@@ -266,8 +268,11 @@
scopes := q["scope"]
Expect(scopes).To(ContainElement(m.params.ClusterID))
-
- w.Header().Set("Transicator-Snapshot-TXID", util.GenerateUUID())
+ if m.params.Scope != "" {
+ Expect(scopes).To(ContainElement(m.params.Scope))
+ }
+ m.snapshotID = util.GenerateUUID()
+ w.Header().Set(headerSnapshotNumber, m.snapshotID)
if len(scopes) == 1 {
//send bootstrap db
@@ -285,7 +290,7 @@
func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
- val := atomic.SwapInt32(m.newSnap, 0)
+ val := atomic.LoadInt32(m.newSnap)
if val > 0 {
log.Debug("MockServer: force new snapshot")
w.WriteHeader(http.StatusBadRequest)
@@ -311,7 +316,9 @@
//Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
Expect(scopes).To(ContainElement(m.params.ClusterID))
- //Expect(scopes).To(ContainElement(m.params.Scope))
+ if m.params.Scope != "" {
+ Expect(scopes).To(ContainElement(m.params.Scope))
+ }
// todo: the following is just legacy for the existing test in apigeeSync_suite_test
developer := m.createDeveloperWithProductAndApp()
@@ -343,6 +350,7 @@
// force failing auth check
if atomic.LoadInt32(m.authFail) == 1 {
+ atomic.StoreInt32(m.authFail, 0)
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
return
@@ -356,7 +364,7 @@
// check auth header
auth := req.Header.Get("Authorization")
- expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
+ expectedAuth := m.getBearerToken()
if auth != expectedAuth {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
@@ -366,6 +374,10 @@
}
}
+func (m *MockServer) getBearerToken() string {
+ return fmt.Sprintf("Bearer %s", m.oauthToken)
+}
+
// make a handler unreliable
func (m *MockServer) unreliable(target http.HandlerFunc) http.HandlerFunc {
if m.params.ReliableAPI {
diff --git a/pluginData.go b/pluginData.go
index c565c02..10b2dd0 100644
--- a/pluginData.go
+++ b/pluginData.go
@@ -16,7 +16,7 @@
import "github.com/apid/apid-core"
-var pluginData = apid.PluginData{
+var PluginData = apid.PluginData{
Name: "apidApigeeSync",
Version: "0.0.4",
ExtraData: map[string]interface{}{
diff --git a/snapshot.go b/snapshot.go
index df4d63f..023314d 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -15,12 +15,12 @@
package apidApigeeSync
import (
- "github.com/apid/apid-core"
"github.com/apid/apid-core/data"
"github.com/apigee-labs/transicator/common"
"net/http"
"os"
+ "fmt"
"io"
"io/ioutil"
"net/url"
@@ -29,7 +29,14 @@
"time"
)
-type simpleSnapShotManager struct {
+const bootstrapSnapshotName = "bootstrap"
+const lengthSqliteFileName = 7 // len("/sqlite")
+const (
+ headerSnapshotNumber = "Transicator-Snapshot-TXID"
+)
+
+type apidSnapshotManager struct {
+ *offlineSnapshotManager
// to send quit signal to the downloading thread
quitChan chan bool
// to mark the graceful close of snapshotManager
@@ -38,32 +45,40 @@
isClosed *int32
// make sure close() returns immediately if there's no downloading/processing snapshot
isDownloading *int32
+ tokenMan tokenManager
+ dbMan DbManager
+ client *http.Client
}
-func createSnapShotManager() *simpleSnapShotManager {
+func createSnapShotManager(dbMan DbManager, tokenMan tokenManager, client *http.Client) *apidSnapshotManager {
isClosedInt := int32(0)
isDownloadingInt := int32(0)
- return &simpleSnapShotManager{
+ return &apidSnapshotManager{
+ offlineSnapshotManager: &offlineSnapshotManager{
+ dbMan: dbMan,
+ },
quitChan: make(chan bool, 1),
finishChan: make(chan bool, 1),
isClosed: &isClosedInt,
isDownloading: &isDownloadingInt,
+ dbMan: dbMan,
+ tokenMan: tokenMan,
+ client: client,
}
}
/*
- * thread-safe close of snapShotManager
+ * thread-safe close of snapshotManager
* It marks status as closed immediately, and quits backoff downloading
* use <- close() for blocking close
* should only be called by pollChangeManager, because pollChangeManager is dependent on it
*/
-func (s *simpleSnapShotManager) close() <-chan bool {
+func (s *apidSnapshotManager) close() <-chan bool {
//has been closed before
if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
- log.Error("snapShotManager: close() called on a closed snapShotManager!")
+ log.Warn("snapshotManager: close() called on a closed snapshotManager!")
go func() {
s.finishChan <- false
- log.Debug("change manager closed")
}()
return s.finishChan
}
@@ -77,163 +92,52 @@
}
// retrieve boot information: apid_config and apid_config_scope
-func (s *simpleSnapShotManager) downloadBootSnapshot() {
+func (s *apidSnapshotManager) downloadBootSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
}
defer atomic.StoreInt32(s.isDownloading, int32(0))
- // has been closed
- if atomic.LoadInt32(s.isClosed) == int32(1) {
- log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager")
- return
- }
-
log.Debug("download Snapshot for boot data")
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
- err := s.downloadSnapshot(true, scopes, snapshot)
- if err != nil {
- // this may happen during shutdown
- if _, ok := err.(quitSignalError); ok {
- log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error())
- }
- return
- }
-
- // has been closed
- if atomic.LoadInt32(s.isClosed) == int32(1) {
- log.Error("snapShotManager: processSnapshot called on closed snapShotManager")
- return
- }
+ s.downloadSnapshot(true, scopes, snapshot)
// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
s.storeBootSnapshot(snapshot)
}
-func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
- processSnapshot(snapshot)
+func (s *apidSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+ if err := s.dbMan.processSnapshot(snapshot, false); err != nil {
+ log.Panic(err)
+ }
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *simpleSnapShotManager) downloadDataSnapshot() {
+func (s *apidSnapshotManager) downloadDataSnapshot() error {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
}
defer atomic.StoreInt32(s.isDownloading, int32(0))
- // has been closed
- if atomic.LoadInt32(s.isClosed) == int32(1) {
- log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager")
- return
- }
-
log.Debug("download Snapshot for data scopes")
- scopes := findScopesForId(apidInfo.ClusterID)
+ scopes, err := s.dbMan.findScopesForId(apidInfo.ClusterID)
+ if err != nil {
+ return err
+ }
scopes = append(scopes, apidInfo.ClusterID)
snapshot := &common.Snapshot{}
- err := s.downloadSnapshot(false, scopes, snapshot)
- if err != nil {
- // this may happen during shutdown
- if _, ok := err.(quitSignalError); ok {
- log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error())
- }
- return
- }
- s.storeDataSnapshot(snapshot)
-}
-
-func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
- knownTables = extractTablesFromSnapshot(snapshot)
-
- _, err := dataService.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
-
- processSnapshot(snapshot)
- log.Info("Emitting Snapshot to plugins")
-
- select {
- case <-time.After(pluginTimeout):
- log.Panic("Timeout. Plugins failed to respond to snapshot.")
- case <-events.Emit(ApigeeSyncEventSelector, snapshot):
- // the new snapshot has been processed
- // if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
- }
-
-}
-
-func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
-
- tables = make(map[string]bool)
-
- log.Debug("Extracting table names from snapshot")
- //if this panic ever fires, it's a bug
- db, err := dataService.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
- return extractTablesFromDB(db)
-}
-
-func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
-
- tables = make(map[string]bool)
-
- log.Debug("Extracting table names from existing DB")
- rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
- defer rows.Close()
-
- if err != nil {
- log.Panicf("Error reading current set of tables: %v", err)
- }
-
- for rows.Next() {
- var table string
- if err := rows.Scan(&table); err != nil {
- log.Panicf("Error reading current set of tables: %v", err)
- }
- log.Debugf("Table %s found in existing db", table)
-
- tables[table] = true
- }
- return tables
-}
-
-// Skip Downloading snapshot if there is already a snapshot available from previous run
-func (s *simpleSnapShotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
- log.Infof("Starting on local snapshot: %s", snapshotName)
-
- // ensure DB version will be accessible on behalf of dependant plugins
- db, err := dataService.DBVersion(snapshotName)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
-
- knownTables = extractTablesFromDB(db)
- snapshot := &common.Snapshot{
- SnapshotInfo: snapshotName,
- }
- processSnapshot(snapshot)
-
- // allow plugins (including this one) to start immediately on existing database
- // Note: this MUST have no tables as that is used as an indicator
- return snapshot
+ s.downloadSnapshot(false, scopes, snapshot)
+ return s.startOnDataSnapshot(snapshot.SnapshotInfo)
}
// a blocking method
// will keep retrying with backoff until success
-func (s *simpleSnapShotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
- // if closed
- if atomic.LoadInt32(s.isClosed) == int32(1) {
- log.Warn("Trying to download snapshot with a closed snapShotManager")
- return quitSignalError{}
- }
+func (s *apidSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) {
log.Debug("downloadSnapshot")
@@ -254,12 +158,11 @@
//pollWithBackoff only accepts function that accept a single quit channel
//to accommodate functions which need more parameters, wrap them in closures
- attemptDownload := getAttemptDownloadClosure(isBoot, snapshot, uri)
+ attemptDownload := s.getAttemptDownloadClosure(isBoot, snapshot, uri)
pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
- return nil
}
-func getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
+func (s *apidSnapshotManager) getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
return func(_ chan bool) error {
var tid string
@@ -268,19 +171,16 @@
// should never happen, but if it does, it's unrecoverable anyway
log.Panicf("Snapshotserver comm error: %v", err)
}
- addHeaders(req)
-
- var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error
+ addHeaders(req, s.tokenMan.getBearerToken())
if config.GetString(configSnapshotProtocol) != "sqlite" {
log.Panic("Only currently supported snashot protocol is sqlite")
}
req.Header.Set("Accept", "application/transicator+sqlite")
- processSnapshotResponse = processSnapshotServerFileResponse
// Issue the request to the snapshot server
- r, err := httpclient.Do(req)
+ r, err := s.client.Do(req)
if err != nil {
log.Errorf("Snapshotserver comm error: %v", err)
return err
@@ -288,22 +188,28 @@
defer r.Body.Close()
- if r.StatusCode != 200 {
+ switch r.StatusCode {
+ case http.StatusOK:
+ break
+ case http.StatusUnauthorized:
+ s.tokenMan.invalidateToken()
+ fallthrough
+ default:
body, _ := ioutil.ReadAll(r.Body)
log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
- return expected200Error{}
+ return expected200Error
}
// Bootstrap scope is a special case, that can occur only once. The tid is
// hardcoded to "bootstrap" to ensure there can be no clash of tid between
// bootstrap and subsequent data scopes.
if isBoot {
- tid = "bootstrap"
+ tid = bootstrapSnapshotName
} else {
- tid = r.Header.Get("Transicator-Snapshot-TXID")
+ tid = r.Header.Get(headerSnapshotNumber)
}
// Decode the Snapshot server response
- err = processSnapshotResponse(tid, r.Body, snapshot)
+ err = processSnapshotServerFileResponse(tid, r.Body, snapshot)
if err != nil {
log.Errorf("Snapshot server response Data not parsable: %v", err)
return err
@@ -315,11 +221,22 @@
func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
dbPath := data.DBPath("common/" + dbId)
+ dbDir := dbPath[0 : len(dbPath)-lengthSqliteFileName]
log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
+ // if other bootstrap snapshot exists, delete the old file
+ if dbId == bootstrapSnapshotName {
+ if _, err := os.Stat(dbDir); !os.IsNotExist(err) {
+ if err = os.RemoveAll(dbDir); err != nil {
+ log.Errorf("Failed to delete old bootstrap snapshot; %v", err)
+ return err
+ }
+ }
+ }
+
//this path includes the sqlite3 file name. why does mkdir all stop at parent??
log.Infof("Creating directory with mkdirall %s", dbPath)
- err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700)
+ err := os.MkdirAll(dbDir, 0700)
if err != nil {
log.Errorf("Error creating db path %s", err)
}
@@ -343,10 +260,11 @@
}
func handleSnapshotServerError(err error) {
- log.Debugf("Error connecting to snapshot server: %v", err)
+ log.Errorf("Error connecting to snapshot server: %v", err)
}
type offlineSnapshotManager struct {
+ dbMan DbManager
}
func (o *offlineSnapshotManager) close() <-chan bool {
@@ -355,33 +273,28 @@
return c
}
-func (o *offlineSnapshotManager) downloadBootSnapshot() {}
-
-func (o *offlineSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadDataSnapshot() {}
-
-func (o *offlineSnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
- return nil
+func (o *offlineSnapshotManager) downloadBootSnapshot() {
+ log.Panic("downloadBootSnapshot called for offlineSnapshotManager")
}
-func (o *offlineSnapshotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
- log.Infof("Starting on local snapshot: %s", snapshotName)
- // ensure DB version will be accessible on behalf of dependant plugins
- db, err := dataService.DBVersion(snapshotName)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
+func (o *offlineSnapshotManager) downloadDataSnapshot() error {
+ return fmt.Errorf("downloadDataSnapshot called for offlineSnapshotManager")
+}
- knownTables = extractTablesFromDB(db)
+func (o *offlineSnapshotManager) startOnDataSnapshot(snapshotName string) error {
+ log.Infof("Processing snapshot: %s", snapshotName)
snapshot := &common.Snapshot{
SnapshotInfo: snapshotName,
}
- processSnapshot(snapshot)
-
- // allow plugins (including this one) to start immediately on existing database
- // Note: this MUST have no tables as that is used as an indicator
- return snapshot
+ if err := o.dbMan.processSnapshot(snapshot, true); err != nil {
+ return err
+ }
+ log.Info("Emitting Snapshot to plugins")
+ select {
+ case <-time.After(pluginTimeout):
+ return fmt.Errorf("timeout, plugins failed to respond to snapshot")
+ case <-eventService.Emit(ApigeeSyncEventSelector, snapshot):
+ // the new snapshot has been processed
+ }
+ return nil
}
diff --git a/snapshot_test.go b/snapshot_test.go
new file mode 100644
index 0000000..8d99563
--- /dev/null
+++ b/snapshot_test.go
@@ -0,0 +1,146 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+ "github.com/apid/apid-core"
+ "github.com/apid/apid-core/api"
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "net/http"
+ "net/http/httptest"
+ "strconv"
+ "time"
+)
+
+var _ = Describe("Snapshot Manager", func() {
+ testCount := 0
+ var dummyDbMan *dummyDbManager
+ BeforeEach(func() {
+ testCount++
+ dummyDbMan = &dummyDbManager{}
+ })
+
+ Context("offlineSnapshotManager", func() {
+ var testSnapMan *offlineSnapshotManager
+ BeforeEach(func() {
+ testSnapMan = &offlineSnapshotManager{
+ dbMan: dummyDbMan,
+ }
+ })
+ AfterEach(func() {
+ <-testSnapMan.close()
+ })
+
+ It("should have error if download called", func() {
+ Expect(testSnapMan.downloadDataSnapshot()).ToNot(Succeed())
+ Expect(func() { testSnapMan.downloadBootSnapshot() }).To(Panic())
+ })
+
+ It("startOnDataSnapshot should emit events", func() {
+ called := false
+ eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ if _, ok := event.(*common.Snapshot); ok {
+ called = true
+ }
+ })
+ snapshotId := "test_snapshot_" + strconv.Itoa(testCount)
+ Expect(testSnapMan.startOnDataSnapshot(snapshotId)).Should(Succeed())
+ Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(snapshotId))
+ Expect(called).Should(BeTrue())
+ })
+ })
+
+ Context("apidSnapshotManager", func() {
+ var testSnapMan *apidSnapshotManager
+ var dummyTokenMan *dummyTokenManager
+ var testServer *httptest.Server
+ var testRouter apid.Router
+ var testMock *MockServer
+ BeforeEach(func() {
+ dummyTokenMan = &dummyTokenManager{
+ invalidateChan: make(chan bool, 1),
+ }
+ client := &http.Client{}
+ testSnapMan = createSnapShotManager(dummyDbMan, dummyTokenMan, client)
+
+ // create a new API service to have a new router for testing
+ testRouter = api.CreateService().Router()
+ testServer = httptest.NewServer(testRouter)
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: true,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "",
+ Organization: "att",
+ Environment: "prod",
+ }
+ apidInfo.ClusterID = expectedClusterId
+ apidInfo.InstanceID = expectedInstanceId
+ testMock = Mock(mockParms, testRouter)
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configPollInterval, 1*time.Millisecond)
+
+ initialBackoffInterval = time.Millisecond
+ testMock.oauthToken = "test_token_" + strconv.Itoa(testCount)
+ dummyTokenMan.token = testMock.oauthToken
+ })
+
+ AfterEach(func() {
+ <-testSnapMan.close()
+ })
+
+ It("downloadBootSnapshot happy path", func() {
+ testMock.normalAuthCheck()
+ testSnapMan.downloadBootSnapshot()
+ Expect(dummyDbMan.isDataSnapshot).Should(BeFalse())
+ Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName))
+ })
+
+ It("downloadBootSnapshot should retry for auth failure", func() {
+ testMock.forceAuthFailOnce()
+ testSnapMan.downloadBootSnapshot()
+ Expect(dummyDbMan.isDataSnapshot).Should(BeFalse())
+ Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName))
+ Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue())
+ })
+
+ It("downloadDataSnapshot happy path", func() {
+ testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount)
+ dummyDbMan.scopes = []string{testMock.params.Scope}
+ testMock.normalAuthCheck()
+ testSnapMan.downloadDataSnapshot()
+ Expect(dummyDbMan.isDataSnapshot).Should(BeTrue())
+ Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID))
+ })
+
+ It("downloadDataSnapshot should retry for auth failure", func() {
+ testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount)
+ dummyDbMan.scopes = []string{testMock.params.Scope}
+ testMock.forceAuthFailOnce()
+ testSnapMan.downloadDataSnapshot()
+ Expect(dummyDbMan.isDataSnapshot).Should(BeTrue())
+ Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID))
+ Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue())
+ })
+
+ })
+
+})
diff --git a/test_mock_test.go b/test_mock_test.go
index de2f673..9ba4e19 100644
--- a/test_mock_test.go
+++ b/test_mock_test.go
@@ -14,10 +14,115 @@
package apidApigeeSync
import (
+ "github.com/apid/apid-core"
"github.com/apigee-labs/transicator/common"
- "net/url"
+ "math/rand"
+ "net/http"
+ "strconv"
)
+type mockService struct {
+ config apid.ConfigService
+ log apid.LogService
+ api apid.APIService
+ data apid.DataService
+ events apid.EventsService
+}
+
+func (s *mockService) API() apid.APIService {
+ return s.api
+}
+
+func (s *mockService) Config() apid.ConfigService {
+ return s.config
+}
+
+func (s *mockService) Data() apid.DataService {
+ return s.data
+}
+
+func (s *mockService) Events() apid.EventsService {
+ return s.events
+}
+
+func (s *mockService) Log() apid.LogService {
+ return s.log
+}
+
+type mockApi struct {
+ handleMap map[string]http.HandlerFunc
+}
+
+func (m *mockApi) Listen() error {
+ return nil
+}
+func (m *mockApi) Handle(path string, handler http.Handler) apid.Route {
+ return nil
+}
+func (m *mockApi) HandleFunc(path string, handlerFunc http.HandlerFunc) apid.Route {
+ m.handleMap[path] = handlerFunc
+ return apid.API().HandleFunc(path+strconv.Itoa(rand.Int()), handlerFunc)
+}
+func (m *mockApi) Vars(r *http.Request) map[string]string {
+ return nil
+}
+
+func (m *mockApi) Router() apid.Router {
+ return nil
+}
+
+type mockData struct {
+}
+
+func (m *mockData) DB() (apid.DB, error) {
+ return nil, nil
+}
+
+func (m *mockData) DBForID(id string) (apid.DB, error) {
+ return nil, nil
+}
+
+func (m *mockData) DBVersion(version string) (apid.DB, error) {
+ return nil, nil
+}
+func (m *mockData) DBVersionForID(id, version string) (apid.DB, error) {
+ return nil, nil
+}
+
+func (m *mockData) ReleaseDB(version string) {}
+func (m *mockData) ReleaseCommonDB() {}
+func (m *mockData) ReleaseDBForID(id, version string) {}
+
+type mockEvent struct {
+ listenerMap map[apid.EventSelector]apid.EventHandlerFunc
+}
+
+func (e *mockEvent) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+ return nil
+}
+
+func (e *mockEvent) EmitWithCallback(selector apid.EventSelector, event apid.Event, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) ListenFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) ListenOnceFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+ e.listenerMap[selector] = handler
+}
+
+func (e *mockEvent) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) Close() {}
+
type dummyChangeManager struct {
pollChangeWithBackoffChan chan bool
}
@@ -34,43 +139,34 @@
type dummyTokenManager struct {
invalidateChan chan bool
+ token string
+ tokenReadyChan chan bool
}
func (t *dummyTokenManager) getTokenReadyChannel() <-chan bool {
- return nil
+ return t.tokenReadyChan
}
func (t *dummyTokenManager) getBearerToken() string {
- return ""
+ return t.token
}
-func (t *dummyTokenManager) invalidateToken() error {
+func (t *dummyTokenManager) invalidateToken() {
log.Debug("invalidateToken called")
- testMock.passAuthCheck()
t.invalidateChan <- true
- return nil
-}
-
-func (t *dummyTokenManager) getToken() *OauthToken {
- return nil
}
func (t *dummyTokenManager) close() {
return
}
-func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
- return func(chan bool) error {
- return nil
- }
-}
-
func (t *dummyTokenManager) start() {
}
type dummySnapshotManager struct {
downloadCalledChan chan bool
+ startCalledChan chan bool
}
func (s *dummySnapshotManager) close() <-chan bool {
@@ -80,28 +176,60 @@
}
func (s *dummySnapshotManager) downloadBootSnapshot() {
-
+ s.downloadCalledChan <- false
}
-func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadDataSnapshot() {
- log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+func (s *dummySnapshotManager) downloadDataSnapshot() error {
s.downloadCalledChan <- true
-}
-
-func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
return nil
}
-func (s *dummySnapshotManager) startOnLocalSnapshot(snapshot string) *common.Snapshot {
- return &common.Snapshot{
- SnapshotInfo: snapshot,
- }
+func (s *dummySnapshotManager) startOnDataSnapshot(snapshot string) error {
+ s.startCalledChan <- true
+ return nil
+}
+
+type dummyDbManager struct {
+ lastSequence string
+ knownTables map[string]bool
+ scopes []string
+ snapshot *common.Snapshot
+ isDataSnapshot bool
+ lastSeqUpdated chan string
+}
+
+func (d *dummyDbManager) initDB() error {
+ return nil
+}
+func (d *dummyDbManager) setDB(db apid.DB) {
+
+}
+func (d *dummyDbManager) getLastSequence() (lastSequence string) {
+ return d.lastSequence
+}
+func (d *dummyDbManager) findScopesForId(configId string) (scopes []string, err error) {
+ return d.scopes, nil
+}
+func (d *dummyDbManager) updateLastSequence(lastSequence string) error {
+ d.lastSeqUpdated <- lastSequence
+ return nil
+}
+func (d *dummyDbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
+ return apidInstanceInfo{
+ InstanceID: "",
+ InstanceName: "",
+ ClusterID: "",
+ LastSnapshot: "",
+ }, nil
+}
+func (d *dummyDbManager) processChangeList(changes *common.ChangeList) error {
+ return nil
+}
+func (d *dummyDbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+ d.snapshot = snapshot
+ d.isDataSnapshot = isDataSnapshot
+ return nil
+}
+func (d *dummyDbManager) getKnowTables() map[string]bool {
+ return d.knownTables
}
diff --git a/token.go b/token.go
index 95d4b06..c355a61 100644
--- a/token.go
+++ b/token.go
@@ -17,7 +17,6 @@
import (
"bytes"
"encoding/json"
- "errors"
"github.com/apid/apid-core/util"
"io/ioutil"
"net/http"
@@ -35,15 +34,13 @@
Usage:
man := createTokenManager()
bearer := man.getBearerToken()
- // will automatically update config(configBearerToken) for other modules
- // optionally, when done...
- man.close()
+ will automatically update config(configBearerToken) for other modules
*/
-func createSimpleTokenManager() *simpleTokenManager {
+func createApidTokenManager(isNewInstance bool) *apidTokenManager {
isClosedInt := int32(0)
- t := &simpleTokenManager{
+ t := &apidTokenManager{
quitPollingForToken: make(chan bool, 1),
closed: make(chan bool),
getTokenChan: make(chan bool),
@@ -52,11 +49,12 @@
invalidateDone: make(chan bool),
tokenUpdatedChan: make(chan bool, 1),
isClosed: &isClosedInt,
+ isNewInstance: isNewInstance,
}
return t
}
-type simpleTokenManager struct {
+type apidTokenManager struct {
token *OauthToken
isClosed *int32
quitPollingForToken chan bool
@@ -67,19 +65,20 @@
returnTokenChan chan *OauthToken
invalidateDone chan bool
tokenUpdatedChan chan bool
+ isNewInstance bool
}
-func (t *simpleTokenManager) start() {
+func (t *apidTokenManager) start() {
t.retrieveNewToken()
t.refreshTimer = time.After(t.token.refreshIn())
go t.maintainToken()
}
-func (t *simpleTokenManager) getBearerToken() string {
+func (t *apidTokenManager) getBearerToken() string {
return t.getToken().AccessToken
}
-func (t *simpleTokenManager) maintainToken() {
+func (t *apidTokenManager) maintainToken() {
for {
select {
case <-t.closed:
@@ -100,19 +99,13 @@
}
// will block until valid
-func (t *simpleTokenManager) invalidateToken() error {
- //has been closed
- if atomic.LoadInt32(t.isClosed) == int32(1) {
- log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
- return errors.New("invalidateToken() called on closed tokenManager")
- }
+func (t *apidTokenManager) invalidateToken() {
log.Debug("invalidating token")
t.invalidateTokenChan <- true
<-t.invalidateDone
- return nil
}
-func (t *simpleTokenManager) getToken() *OauthToken {
+func (t *apidTokenManager) getToken() *OauthToken {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: getToken() called on closed tokenManager")
@@ -126,7 +119,7 @@
* blocking close() of tokenMan
*/
-func (t *simpleTokenManager) close() {
+func (t *apidTokenManager) close() {
//has been closed
if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
log.Panic("TokenManager: close() has been called before!")
@@ -141,7 +134,7 @@
}
// don't call externally. will block until success.
-func (t *simpleTokenManager) retrieveNewToken() {
+func (t *apidTokenManager) retrieveNewToken() {
log.Debug("Getting OAuth token...")
uriString := config.GetString(configProxyServerBaseURI)
@@ -151,10 +144,10 @@
}
uri.Path = path.Join(uri.Path, "/accesstoken")
- pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
+ pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : %v", err) })
}
-func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+func (t *apidTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
@@ -168,9 +161,9 @@
req.Header.Set("status", "ONLINE")
req.Header.Set("plugin_details", apidPluginDetails)
- if newInstanceID {
+ if t.isNewInstance {
req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339))
- newInstanceID = false
+ t.isNewInstance = false
} else {
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}
@@ -194,7 +187,7 @@
if resp.StatusCode != 200 {
log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
- return expected200Error{}
+ return expected200Error
}
var token OauthToken
@@ -212,22 +205,11 @@
}
log.Debugf("Got new token: %#v", token)
-
- /*
- if newInstanceID {
- newInstanceID = false
- err = updateApidInstanceInfo()
- if err != nil {
- log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err)
- return err
-
- }
- }
- */
t.token = &token
config.Set(configBearerToken, token.AccessToken)
//don't block on the buffered channel. that means there is already a signal to serve new token
+ //TODO: This assumes apid-gateway is 1-1 mapping. Make use of generic long-polling provided by apid-core
select {
case t.tokenUpdatedChan <- true:
default:
@@ -238,7 +220,7 @@
}
}
-func (t *simpleTokenManager) getTokenReadyChannel() <-chan bool {
+func (t *apidTokenManager) getTokenReadyChannel() <-chan bool {
return t.tokenUpdatedChan
}
diff --git a/token_test.go b/token_test.go
index 1dfaabb..fa9379c 100644
--- a/token_test.go
+++ b/token_test.go
@@ -94,7 +94,7 @@
w.Write(body)
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createSimpleTokenManager()
+ testedTokenManager := createApidTokenManager(false)
testedTokenManager.start()
token := testedTokenManager.getToken()
@@ -123,7 +123,7 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createSimpleTokenManager()
+ testedTokenManager := createApidTokenManager(false)
testedTokenManager.start()
token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -163,7 +163,7 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createSimpleTokenManager()
+ testedTokenManager := createApidTokenManager(false)
testedTokenManager.start()
testedTokenManager.getToken()
@@ -179,8 +179,6 @@
finished := make(chan bool, 1)
count := 0
- newInstanceID = true
-
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
count++
@@ -202,7 +200,7 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createSimpleTokenManager()
+ testedTokenManager := createApidTokenManager(true)
testedTokenManager.start()
testedTokenManager.getToken()
testedTokenManager.invalidateToken()
diff --git a/util.go b/util.go
new file mode 100644
index 0000000..7d59214
--- /dev/null
+++ b/util.go
@@ -0,0 +1,172 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+ "fmt"
+ "math"
+ "math/rand"
+ "net/http"
+ "time"
+)
+
+const (
+ httpTimeout = time.Minute
+ pluginTimeout = time.Minute
+ maxIdleConnsPerHost = 10
+ defaultInitial time.Duration = 200 * time.Millisecond
+ defaultMax time.Duration = 10 * time.Second
+ defaultFactor float64 = 2
+)
+
+var (
+ initialBackoffInterval = defaultInitial
+)
+
+var (
+ expected200Error = fmt.Errorf("did not recieve OK response")
+ quitSignalError = fmt.Errorf("signal to quit encountered")
+ authFailError = fmt.Errorf("authorization failed")
+)
+
+type Backoff struct {
+ attempt int
+ initial, max time.Duration
+ jitter bool
+ backoffStrategy func() time.Duration
+}
+
+type ExponentialBackoff struct {
+ Backoff
+ factor float64
+}
+
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
+ backoff := &ExponentialBackoff{}
+
+ if initial <= 0 {
+ initial = defaultInitial
+ }
+ if max <= 0 {
+ max = defaultMax
+ }
+
+ if factor <= 0 {
+ factor = defaultFactor
+ }
+
+ backoff.initial = initial
+ backoff.max = max
+ backoff.attempt = 0
+ backoff.factor = factor
+ backoff.jitter = jitter
+ backoff.backoffStrategy = backoff.exponentialBackoffStrategy
+
+ return backoff
+}
+
+func (b *Backoff) Duration() time.Duration {
+ d := b.backoffStrategy()
+ b.attempt++
+ return d
+}
+
+func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
+
+ initial := float64(b.Backoff.initial)
+ attempt := float64(b.Backoff.attempt)
+ duration := initial * math.Pow(b.factor, attempt)
+
+ if duration > math.MaxInt64 {
+ return b.max
+ }
+ dur := time.Duration(duration)
+
+ if b.jitter {
+ duration = rand.Float64()*(duration-initial) + initial
+ }
+
+ if dur > b.max {
+ return b.max
+ }
+
+ log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
+ return dur
+}
+
+func (b *Backoff) Reset() {
+ b.attempt = 0
+}
+
+func (b *Backoff) Attempt() int {
+ return b.attempt
+}
+
+/*
+ * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
+ * for retrying on errors
+ */
+func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
+
+ backoff := NewExponentialBackoff(initialBackoffInterval, config.GetDuration(configPollInterval), 2, true)
+
+ //initialize the retry channel to start first attempt immediately
+ retry := time.After(0 * time.Millisecond)
+
+ for {
+ select {
+ case <-quit:
+ log.Info("Quit signal recieved. Returning")
+ return
+ case <-retry:
+ start := time.Now()
+
+ err := toExecute(quit)
+ if err == nil || err == quitSignalError {
+ return
+ }
+
+ end := time.Now()
+ //error encountered, since we would have returned above otherwise
+ handleError(err)
+
+ /* TODO keep this around? Imagine an immediately erroring service,
+ * causing many sequential requests which could pollute logs
+ */
+ //only backoff if the request took less than one second
+ if end.After(start.Add(time.Second)) {
+ backoff.Reset()
+ retry = time.After(0 * time.Millisecond)
+ } else {
+ retry = time.After(backoff.Duration())
+ }
+ }
+ }
+}
+
+func addHeaders(req *http.Request, token string) {
+ req.Header.Set("Authorization", "Bearer "+token)
+ req.Header.Set("apid_instance_id", apidInfo.InstanceID)
+ req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
+ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
+}
+
+type changeServerError struct {
+ Code string `json:"code"`
+}
+
+func (a changeServerError) Error() string {
+ return a.Code
+}
diff --git a/backoff_test.go b/util_test.go
similarity index 100%
rename from backoff_test.go
rename to util_test.go