Merge pull request #45 from 30x/genericsqlite
Genericsqlite
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 9e6ed94..dd6cba4 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -24,6 +24,7 @@
)
const dummyConfigValue string = "placeholder"
+const expectedClusterId = "bootstrap"
var _ = BeforeSuite(func() {
wipeDBAferTest = true
@@ -42,11 +43,11 @@
config.Set(configProxyServerBaseURI, dummyConfigValue)
config.Set(configSnapServerBaseURI, dummyConfigValue)
config.Set(configChangeServerBaseURI, dummyConfigValue)
- config.Set(configSnapshotProtocol, "json")
+ config.Set(configSnapshotProtocol, "sqlite")
config.Set(configPollInterval, 10*time.Millisecond)
config.Set(configName, "testhost")
- config.Set(configApidClusterId, "bootstrap")
+ config.Set(configApidClusterId, expectedClusterId)
config.Set(configConsumerKey, "XXXXXXX")
config.Set(configConsumerSecret, "YYYYYYY")
@@ -54,6 +55,7 @@
log = apid.Log()
_initPlugin(apid.AllServices())
+ createManagers()
close(done)
}, 3)
@@ -63,11 +65,6 @@
lastSequence = ""
if wipeDBAferTest {
- _, err := getDB().Exec("DELETE FROM APID_CLUSTER")
- Expect(err).NotTo(HaveOccurred())
- _, err = getDB().Exec("DELETE FROM DATA_SCOPE")
- Expect(err).NotTo(HaveOccurred())
-
db, err := dataService.DB()
Expect(err).NotTo(HaveOccurred())
_, err = db.Exec("DELETE FROM APID")
diff --git a/apigee_sync.go b/apigee_sync.go
index 391355b..6fc1389 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -27,17 +27,17 @@
snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
- changeManager.pollChangeWithBackoff()
+ apidChangeManager.pollChangeWithBackoff()
})
log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
return
}
- snapManager.downloadBootSnapshot()
- snapManager.downloadDataSnapshot()
+ apidSnapshotManager.downloadBootSnapshot()
+ apidSnapshotManager.downloadDataSnapshot()
- changeManager.pollChangeWithBackoff()
+ apidChangeManager.pollChangeWithBackoff()
}
@@ -88,7 +88,7 @@
}
func addHeaders(req *http.Request) {
- req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -104,6 +104,9 @@
type expected200Error struct {
}
+type authFailError struct {
+}
+
func (an expected200Error) Error() string {
return "Did not recieve OK response"
}
@@ -115,3 +118,7 @@
func (a changeServerError) Error() string {
return a.Code
}
+
+func (a authFailError) Error() string {
+ return "Authorization failed"
+}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 4659f9c..22823a4 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -13,6 +13,9 @@
Context("Sync", func() {
+ const expectedDataScopeId1 = "dataScope1"
+ const expectedDataScopeId2 = "dataScope2"
+
var initializeContext = func() {
testRouter = apid.API().Router()
testServer = httptest.NewServer(testRouter)
@@ -53,57 +56,62 @@
var lastSnapshot *common.Snapshot
expectedSnapshotTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms.company"},
- common.Change{Table: "edgex.apid_cluster"},
- common.Change{Table: "edgex.data_scope"}},
+ Changes: []common.Change{common.Change{Table: "kms_company"},
+ common.Change{Table: "edgex_apid_cluster"},
+ common.Change{Table: "edgex_data_scope"},
+ common.Change{Table: "kms_app_credential"},
+ common.Change{Table: "kms_app_credential_apiproduct_mapper"},
+ common.Change{Table: "kms_developer"},
+ common.Change{Table: "kms_company_developer"},
+ common.Change{Table: "kms_api_product"},
+ common.Change{Table: "kms_app"}},
}
apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
if s, ok := event.(*common.Snapshot); ok {
+ Expect(16).To(Equal(len(knownTables)))
Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
- //add apid_cluster and data_scope since those would present if this were a real scenario
- knownTables["kms.app_credential"] = true
- knownTables["kms.app_credential_apiproduct_mapper"] = true
- knownTables["kms.developer"] = true
- knownTables["kms.company_developer"] = true
- knownTables["kms.api_product"] = true
- knownTables["kms.app"] = true
-
lastSnapshot = s
- for _, t := range s.Tables {
- switch t.Name {
+ db, _ := dataService.DBVersion(s.SnapshotInfo)
+ var rowCount int
+ var id string
- case "edgex.apid_cluster":
- Expect(t.Rows).To(HaveLen(1))
- r := t.Rows[0]
- var id string
- r.Get("id", &id)
- Expect(id).To(Equal("bootstrap"))
+ err := db.Ping()
+ Expect(err).NotTo(HaveOccurred())
+ numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;")
+ if err != nil {
+ Fail("Failed to get correct DB")
+ }
+ Expect(true).To(Equal(numApidClusters.Next()))
+ numApidClusters.Scan(&rowCount)
+ Expect(1).To(Equal(rowCount))
+ apidClusters, _ := db.Query("select id from edgex_apid_cluster;")
+ apidClusters.Next()
+ apidClusters.Scan(&id)
+ Expect(id).To(Equal(expectedClusterId))
- case "edgex.data_scope":
- Expect(t.Rows).To(HaveLen(2))
- r := t.Rows[1] // get the non-cluster row
+ numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;")
+ Expect(true).To(Equal(numDataScopes.Next()))
+ numDataScopes.Scan(&rowCount)
+ Expect(2).To(Equal(rowCount))
+ dataScopes, _ := db.Query("select id from edgex_data_scope;")
+ dataScopes.Next()
+ dataScopes.Scan(&id)
+ dataScopes.Next()
- var id, clusterID, env, org, scope string
- r.Get("id", &id)
- r.Get("apid_cluster_id", &clusterID)
- r.Get("env", &env)
- r.Get("org", &org)
- r.Get("scope", &scope)
-
- Expect(id).To(Equal("ert452"))
- Expect(scope).To(Equal("ert452"))
- Expect(clusterID).To(Equal("bootstrap"))
- Expect(env).To(Equal("prod"))
- Expect(org).To(Equal("att"))
- }
+ if id == expectedDataScopeId1 {
+ dataScopes.Scan(&id)
+ Expect(id).To(Equal(expectedDataScopeId2))
+ } else {
+ dataScopes.Scan(&id)
+ Expect(id).To(Equal(expectedDataScopeId1))
}
} else if cl, ok := event.(*common.ChangeList); ok {
- closeDone = changeManager.close()
+ closeDone = apidChangeManager.close()
// ensure that snapshot switched DB versions
Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
@@ -122,12 +130,12 @@
Expect(tenantID).To(Equal("ert452"))
}
- Expect(tables).To(ContainElement("kms.app_credential"))
- Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
- Expect(tables).To(ContainElement("kms.developer"))
- Expect(tables).To(ContainElement("kms.company_developer"))
- Expect(tables).To(ContainElement("kms.api_product"))
- Expect(tables).To(ContainElement("kms.app"))
+ Expect(tables).To(ContainElement("kms_app_credential"))
+ Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper"))
+ Expect(tables).To(ContainElement("kms_developer"))
+ Expect(tables).To(ContainElement("kms_company_developer"))
+ Expect(tables).To(ContainElement("kms_api_product"))
+ Expect(tables).To(ContainElement("kms_app"))
go func() {
// when close done, all handlers for the first changeList have been executed
@@ -135,10 +143,8 @@
defer GinkgoRecover()
// allow other handler to execute to insert last_sequence
var seq string
- //for seq = ""; seq == ""; {
- // time.Sleep(50 * time.Millisecond)
- err := getDB().
- QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
+ err = getDB().
+ QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;").
Scan(&seq)
Expect(err).NotTo(HaveOccurred())
//}
@@ -163,9 +169,9 @@
initializeContext()
expectedTables := common.ChangeList{
- Changes: []common.Change{common.Change{Table: "kms.company"},
- common.Change{Table: "edgex.apid_cluster"},
- common.Change{Table: "edgex.data_scope"}},
+ Changes: []common.Change{common.Change{Table: "kms_company"},
+ common.Change{Table: "edgex_apid_cluster"},
+ common.Change{Table: "edgex_data_scope"}},
}
Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
@@ -174,7 +180,7 @@
if s, ok := event.(*common.Snapshot); ok {
// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
- closeDone = changeManager.close()
+ closeDone = apidChangeManager.close()
go func() {
// when close done, all handlers for the first snapshot have been executed
<-closeDone
@@ -283,18 +289,19 @@
*/
It("Should be able to handle duplicate snapshot during bootstrap", func() {
initializeContext()
- tokenManager = createTokenManager()
- snapManager = createSnapShotManager()
+ apidTokenManager = createSimpleTokenManager()
+ apidTokenManager.start()
+ apidSnapshotManager = createSnapShotManager()
events.Listen(ApigeeSyncEventSelector, &handler{})
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
- snapManager.downloadSnapshot(scopes, snapshot)
- snapManager.storeBootSnapshot(snapshot)
- snapManager.storeDataSnapshot(snapshot)
+ apidSnapshotManager.downloadSnapshot(scopes, snapshot)
+ apidSnapshotManager.storeBootSnapshot(snapshot)
+ apidSnapshotManager.storeDataSnapshot(snapshot)
restoreContext()
- <-snapManager.close()
- tokenManager.close()
+ <-apidSnapshotManager.close()
+ apidTokenManager.close()
}, 3)
It("Reuse http.Client connection for multiple concurrent requests", func() {
diff --git a/change_test.go b/change_test.go
new file mode 100644
index 0000000..7a69995
--- /dev/null
+++ b/change_test.go
@@ -0,0 +1,202 @@
+package apidApigeeSync
+
+import (
+ "github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ "net/http/httptest"
+ "net/url"
+ "os"
+ "time"
+)
+
+var _ = Describe("Change Agent", func() {
+
+ Context("Change Agent Unit Tests", func() {
+ handler := handler{}
+
+ var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
+ initDb(sqlfile, "./mockdb_change.sqlite3")
+ file, err := os.Open("./mockdb_change.sqlite3")
+ if err != nil {
+ Fail("Failed to open mock db for test")
+ }
+
+ s := common.Snapshot{}
+ err = processSnapshotServerFileResponse(dbId, file, &s)
+ if err != nil {
+ Fail("Error processing test snapshots")
+ }
+ return s
+ }
+
+ BeforeEach(func() {
+ event := createTestDb("./sql/init_mock_db.sql", "test_change")
+ handler.Handle(&event)
+ knownTables = extractTablesFromDB(getDB())
+ })
+
+ var initializeContext = func() {
+ testRouter = apid.API().Router()
+ testServer = httptest.NewServer(testRouter)
+
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: true,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "ert452",
+ Organization: "att",
+ Environment: "prod",
+ }
+ testMock = Mock(mockParms, testRouter)
+
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configPollInterval, 1*time.Millisecond)
+ }
+
+ var restoreContext = func() {
+
+ testServer.Close()
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
+ config.Set(configPollInterval, 10*time.Millisecond)
+ }
+
+ It("test change agent with authorization failure", func() {
+ log.Debug("test change agent with authorization failure")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ apidSnapshotManager = &dummySnapshotManager{}
+ initializeContext()
+ testMock.forceAuthFail()
+ wipeDBAferTest = true
+ apidChangeManager.pollChangeWithBackoff()
+ // auth check fails
+ <-testTokenManager.invalidateChan
+ log.Debug("closing")
+ <-apidChangeManager.close()
+ restoreContext()
+ })
+
+ It("test change agent with too old snapshot", func() {
+ log.Debug("test change agent with too old snapshot")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ testSnapshotManager := &dummySnapshotManager{make(chan bool)}
+ apidSnapshotManager = testSnapshotManager
+ initializeContext()
+
+ testMock.passAuthCheck()
+ testMock.forceNewSnapshot()
+ wipeDBAferTest = true
+ apidChangeManager.pollChangeWithBackoff()
+ <-testSnapshotManager.downloadCalledChan
+ log.Debug("closing")
+ <-apidChangeManager.close()
+ restoreContext()
+ })
+
+ It("change agent should retry with authorization failure", func(done Done) {
+ log.Debug("change agent should retry with authorization failure")
+ testTokenManager := &dummyTokenManager{make(chan bool)}
+ apidTokenManager = testTokenManager
+ apidTokenManager.start()
+ apidSnapshotManager = &dummySnapshotManager{}
+ initializeContext()
+ testMock.forceAuthFail()
+ testMock.forceNoSnapshot()
+ wipeDBAferTest = true
+
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+ if _, ok := event.(*common.ChangeList); ok {
+ closeDone := apidChangeManager.close()
+ log.Debug("closing")
+ go func() {
+ // when close done, all handlers for the first snapshot have been executed
+ <-closeDone
+ restoreContext()
+ close(done)
+ }()
+
+ }
+ })
+
+ apidChangeManager.pollChangeWithBackoff()
+ // auth check fails
+ <-testTokenManager.invalidateChan
+ })
+
+ })
+})
+
+type dummyTokenManager struct {
+ invalidateChan chan bool
+}
+
+func (t *dummyTokenManager) getBearerToken() string {
+ return ""
+}
+
+func (t *dummyTokenManager) invalidateToken() error {
+ log.Debug("invalidateToken called")
+ testMock.passAuthCheck()
+ t.invalidateChan <- true
+ return nil
+}
+
+func (t *dummyTokenManager) getToken() *oauthToken {
+ return nil
+}
+
+func (t *dummyTokenManager) close() {
+ return
+}
+
+func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
+ return func(chan bool) error {
+ return nil
+ }
+}
+
+func (t *dummyTokenManager) start() {
+
+}
+
+type dummySnapshotManager struct {
+ downloadCalledChan chan bool
+}
+
+func (s *dummySnapshotManager) close() <-chan bool {
+ closeChan := make(chan bool)
+ close(closeChan)
+ return closeChan
+}
+
+func (s *dummySnapshotManager) downloadBootSnapshot() {
+
+}
+
+func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (s *dummySnapshotManager) downloadDataSnapshot() {
+ log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+ s.downloadCalledChan <- true
+}
+
+func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (s *dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+ return nil
+}
diff --git a/changes.go b/changes.go
index e9be041..9e8d170 100644
--- a/changes.go
+++ b/changes.go
@@ -54,8 +54,8 @@
log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
go func() {
c.quitChan <- true
- tokenManager.close()
- <-snapManager.close()
+ apidTokenManager.close()
+ <-apidSnapshotManager.close()
log.Debug("change manager closed")
finishChan <- false
}()
@@ -65,8 +65,8 @@
log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
go func() {
c.quitChan <- true
- tokenManager.close()
- <-snapManager.close()
+ apidTokenManager.close()
+ <-apidSnapshotManager.close()
log.Debug("change manager closed")
finishChan <- true
}()
@@ -183,8 +183,11 @@
log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
switch r.StatusCode {
case http.StatusUnauthorized:
- tokenManager.invalidateToken()
- return nil
+ err = apidTokenManager.invalidateToken()
+ if err != nil {
+ return err
+ }
+ return authFailError{}
case http.StatusNotModified:
return nil
@@ -206,7 +209,7 @@
log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
err = apiErr
}
- return nil
+ return err
}
return nil
}
@@ -271,7 +274,7 @@
}
if c, ok := err.(changeServerError); ok {
log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
- snapManager.downloadDataSnapshot()
+ apidSnapshotManager.downloadDataSnapshot()
} else {
log.Debugf("Error connecting to changeserver: %v", err)
}
@@ -289,7 +292,7 @@
}
for _, change := range changes {
- if !a[change.Table] {
+ if !a[normalizeTableName(change.Table)] {
log.Infof("Unable to find %s table in current known tables", change.Table)
return true
}
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go
index 6773606..bf4de27 100644
--- a/cmd/mockServer/main.go
+++ b/cmd/mockServer/main.go
@@ -5,7 +5,6 @@
"os"
- "time"
"github.com/30x/apid-core"
"github.com/30x/apid-core/factory"
@@ -22,11 +21,8 @@
reliable := f.Bool("reliable", true, "if false, server will often send 500 errors")
numDevs := f.Int("numDevs", 2, "number of developers in snapshot")
- addDevEach := f.Duration("addDevEach", 0*time.Second, "add a developer each duration (default 0s)")
- upDevEach := f.Duration("upDevEach", 0*time.Second, "update a developer each duration (default 0s)")
numDeps := f.Int("numDeps", 2, "number of deployments in snapshot")
- upDepEach := f.Duration("upDepEach", 0*time.Second, "update (replace) a deployment each duration (default 0s)")
f.Parse(os.Args[1:])
@@ -51,10 +47,7 @@
Organization: "org",
Environment: "test",
NumDevelopers: *numDevs,
- AddDeveloperEvery: *addDevEach,
- UpdateDeveloperEvery: *upDevEach,
NumDeployments: *numDeps,
- ReplaceDeploymentEvery: *upDepEach,
BundleURI: *bundleURI,
}
diff --git a/data.go b/data.go
index 4761373..a9b1496 100644
--- a/data.go
+++ b/data.go
@@ -8,6 +8,9 @@
"sync"
"github.com/30x/apid-core"
+ "github.com/apigee-labs/transicator/common"
+ "sort"
+ "strings"
)
var (
@@ -39,30 +42,6 @@
last_snapshot_info text,
PRIMARY KEY (instance_id)
);
- CREATE TABLE IF NOT EXISTS APID_CLUSTER (
- id text,
- name text,
- description text,
- umbrella_org_app_name text,
- created text,
- created_by text,
- updated text,
- updated_by text,
- last_sequence text,
- PRIMARY KEY (id)
- );
- CREATE TABLE IF NOT EXISTS DATA_SCOPE (
- id text,
- apid_cluster_id text,
- scope text,
- org text,
- env text,
- created text,
- created_by text,
- updated text,
- updated_by text,
- PRIMARY KEY (id, apid_cluster_id)
- );
`)
if err != nil {
return err
@@ -85,86 +64,289 @@
dbMux.Unlock()
}
-func insertApidCluster(dac dataApidCluster, txn *sql.Tx) error {
+//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
+func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
- log.Debugf("inserting into APID_CLUSTER: %v", dac)
-
- //replace to accomodate same snapshot txid
- stmt, err := txn.Prepare(`
- REPLACE INTO APID_CLUSTER
- (id, description, name, umbrella_org_app_name,
- created, created_by, updated, updated_by,
- last_sequence)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9);
- `)
- if err != nil {
- log.Errorf("prepare insert into APID_CLUSTER transaction Failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec(
- dac.ID, dac.Description, dac.Name, dac.OrgAppName,
- dac.Created, dac.CreatedBy, dac.Updated, dac.UpdatedBy,
- "")
-
- if err != nil {
- log.Errorf("insert APID_CLUSTER failed: %v", err)
+ if len(rows) == 0 {
+ return false
}
- return err
+ var orderedColumns []string
+ for column := range rows[0] {
+ orderedColumns = append(orderedColumns, column)
+ }
+ sort.Strings(orderedColumns)
+
+ sql := buildInsertSql(tableName, orderedColumns, rows)
+
+ prep, err := txn.Prepare(sql)
+ if err != nil {
+ log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
+ return false
+ }
+ defer prep.Close()
+
+ var values []interface{}
+
+ for _, row := range rows {
+ for _, columnName := range orderedColumns {
+ //use Value so that stmt exec does not complain about common.ColumnVal being a struct
+ values = append(values, row[columnName].Value)
+ }
+ }
+
+ //create prepared statement from existing template statement
+ _, err = prep.Exec(values...)
+
+ if err != nil {
+ log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
+ return false
+ }
+ log.Debugf("INSERT Success [%s] values=%v", sql, values)
+
+ return true
}
-func insertDataScope(ds dataDataScope, txn *sql.Tx) error {
-
- log.Debugf("insert DATA_SCOPE: %v", ds)
-
- //replace to accomodate same snapshot txid
- stmt, err := txn.Prepare(`
- REPLACE INTO DATA_SCOPE
- (id, apid_cluster_id, scope, org,
- env, created, created_by, updated,
- updated_by)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9);
- `)
- if err != nil {
- log.Errorf("insert DATA_SCOPE failed: %v", err)
- return err
+func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
+ var values = make([]interface{}, len(pkeys))
+ for i, pkey := range pkeys {
+ if row[pkey] == nil {
+ values[i] = nil
+ } else {
+ values[i] = row[pkey].Value
+ }
}
- defer stmt.Close()
-
- _, err = stmt.Exec(
- ds.ID, ds.ClusterID, ds.Scope, ds.Org,
- ds.Env, ds.Created, ds.CreatedBy, ds.Updated,
- ds.UpdatedBy)
-
- if err != nil {
- log.Errorf("insert DATA_SCOPE failed: %v", err)
- return err
- }
-
- return nil
+ return values
}
-func deleteDataScope(ds dataDataScope, txn *sql.Tx) error {
-
- log.Debugf("delete DATA_SCOPE: %v", ds)
-
- stmt, err := txn.Prepare("DELETE FROM DATA_SCOPE WHERE id=$1 and apid_cluster_id=$2")
- if err != nil {
- log.Errorf("update DATA_SCOPE failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- _, err = stmt.Exec(ds.ID, ds.ClusterID)
-
- if err != nil {
- log.Errorf("delete DATA_SCOPE failed: %v", err)
- return err
+func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+ pkeys, err := getPkeysForTable(tableName)
+ sort.Strings(pkeys)
+ if len(pkeys) == 0 || err != nil {
+ log.Errorf("DELETE No primary keys found for table. %s", tableName)
+ return false
}
- return nil
+ if len(rows) == 0 {
+ log.Errorf("No rows found for table.", tableName)
+ return false
+ }
+
+ sql := buildDeleteSql(tableName, rows[0], pkeys)
+ prep, err := txn.Prepare(sql)
+ if err != nil {
+ log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
+ return false
+ }
+ defer prep.Close()
+ for _, row := range rows {
+ values := getValueListFromKeys(row, pkeys)
+ // delete prepared statement from existing template statement
+ res, err := txn.Stmt(prep).Exec(values...)
+ if err != nil {
+ log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
+ return false
+ }
+ affected, err := res.RowsAffected()
+ if err == nil && affected != 0 {
+ log.Debugf("DELETE Success [%s] values=%v", sql, values)
+ } else if err == nil && affected == 0 {
+ log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
+ return false
+ } else {
+ log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
+ return false
+ }
+
+ }
+ return true
+
+}
+
+// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
+func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
+
+ var wherePlaceholders []string
+ i := 1
+ if row == nil {
+ return ""
+ }
+ normalizedTableName := normalizeTableName(tableName)
+
+ for _, pk := range pkeys {
+ wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
+ i++
+ }
+
+ sql := "DELETE FROM " + normalizedTableName
+ sql += " WHERE "
+ sql += strings.Join(wherePlaceholders, " AND ")
+
+ return sql
+
+}
+
+func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+ pkeys, err := getPkeysForTable(tableName)
+ if len(pkeys) == 0 || err != nil {
+ log.Errorf("UPDATE No primary keys found for table.", tableName)
+ return false
+ }
+ if len(oldRows) == 0 || len(newRows) == 0 {
+ return false
+ }
+
+ var orderedColumns []string
+
+ //extract sorted orderedColumns
+ for columnName := range newRows[0] {
+ orderedColumns = append(orderedColumns, columnName)
+ }
+ sort.Strings(orderedColumns)
+
+ //build update statement, use arbitrary row as template
+ sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
+ prep, err := txn.Prepare(sql)
+ if err != nil {
+ log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
+ return false
+ }
+ defer prep.Close()
+
+ for i, row := range newRows {
+ var values []interface{}
+
+ for _, columnName := range orderedColumns {
+ //use Value so that stmt exec does not complain about common.ColumnVal being a struct
+ //TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping
+ //TODO right now this will only work when the column type is a string
+ if row[columnName] != nil {
+ values = append(values, row[columnName].Value)
+ } else {
+ values = append(values, nil)
+ }
+ }
+
+ //add values for where clause, use PKs of old row
+ for _, pk := range pkeys {
+ if oldRows[i][pk] != nil {
+ values = append(values, oldRows[i][pk].Value)
+ } else {
+ values = append(values, nil)
+ }
+
+ }
+
+ //create prepared statement from existing template statement
+ res, err := txn.Stmt(prep).Exec(values...)
+
+ if err != nil {
+ log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
+ return false
+ }
+ numRowsAffected, err := res.RowsAffected()
+ if err != nil {
+ log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
+ return false
+ }
+ //delete this once we figure out why tests are failing/not updating
+ log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
+ log.Debugf("UPDATE Success [%s] values=%v", sql, values)
+
+ }
+
+ return true
+
+}
+
+func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
+ if row == nil {
+ return ""
+ }
+ normalizedTableName := normalizeTableName(tableName)
+
+ var setPlaceholders, wherePlaceholders []string
+ i := 1
+
+ for _, columnName := range orderedColumns {
+ setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i))
+ i++
+ }
+
+ for _, pk := range pkeys {
+ wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
+ i++
+ }
+
+ sql := "UPDATE " + normalizedTableName + " SET "
+ sql += strings.Join(setPlaceholders, ", ")
+ sql += " WHERE "
+ sql += strings.Join(wherePlaceholders, " AND ")
+
+ return sql
+}
+
+//precondition: rows.length > 1000, max number of entities for sqlite
+func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
+ if len(rows) == 0 {
+ return ""
+ }
+ normalizedTableName := normalizeTableName(tableName)
+ var values string = ""
+
+ var i, j int
+ k := 1
+ for i = 0; i < len(rows)-1; i++ {
+ values += "("
+ for j = 0; j < len(orderedColumns)-1; j++ {
+ values += fmt.Sprintf("$%d,", k)
+ k++
+ }
+ values += fmt.Sprintf("$%d),", k)
+ k++
+ }
+ values += "("
+ for j = 0; j < len(orderedColumns)-1; j++ {
+ values += fmt.Sprintf("$%d,", k)
+ k++
+ }
+ values += fmt.Sprintf("$%d)", k)
+
+ sql := "INSERT INTO " + normalizedTableName
+ sql += "(" + strings.Join(orderedColumns, ",") + ") "
+ sql += "VALUES " + values
+
+ return sql
+}
+
+func getPkeysForTable(tableName string) ([]string, error) {
+ db := getDB()
+ normalizedTableName := normalizeTableName(tableName)
+ sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
+ rows, err := db.Query(sql, normalizedTableName)
+ if err != nil {
+ log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err)
+ return nil, err
+ }
+ var columnNames []string
+ defer rows.Close()
+ for rows.Next() {
+ var value string
+ err := rows.Scan(&value)
+ if err != nil {
+ log.Fatal(err)
+ }
+ columnNames = append(columnNames, value)
+ }
+ err = rows.Err()
+ if err != nil {
+ log.Fatal(err)
+ }
+ return columnNames, nil
+}
+
+func normalizeTableName(tableName string) string {
+ return strings.Replace(tableName, ".", "_", 1)
}
/*
@@ -178,9 +360,9 @@
var scope string
db := getDB()
- rows, err := db.Query("select DISTINCT scope from DATA_SCOPE where apid_cluster_id = $1", configId)
+ rows, err := db.Query("select DISTINCT scope from EDGEX_DATA_SCOPE where apid_cluster_id = $1", configId)
if err != nil {
- log.Errorf("Failed to query DATA_SCOPE: %v", err)
+ log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
return
}
defer rows.Close()
@@ -198,9 +380,9 @@
*/
func getLastSequence() (lastSequence string) {
- err := getDB().QueryRow("select last_sequence from APID_CLUSTER LIMIT 1").Scan(&lastSequence)
+ err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
if err != nil && err != sql.ErrNoRows {
- log.Panicf("Failed to query APID_CLUSTER: %v", err)
+ log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
return
}
@@ -216,21 +398,21 @@
log.Debugf("updateLastSequence: %s", lastSequence)
- stmt, err := getDB().Prepare("UPDATE APID_CLUSTER SET last_sequence=$1;")
+ stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
if err != nil {
- log.Errorf("UPDATE APID_CLUSTER Failed: %v", err)
+ log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(lastSequence)
if err != nil {
- log.Errorf("UPDATE APID_CLUSTER Failed: %v", err)
+ log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
- log.Infof("UPDATE APID_CLUSTER Success: %s", lastSequence)
-
+ log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
+ log.Infof("Replication lastSequence=%s", lastSequence)
return nil
}
diff --git a/data_test.go b/data_test.go
new file mode 100644
index 0000000..a26597d
--- /dev/null
+++ b/data_test.go
@@ -0,0 +1,1185 @@
+package apidApigeeSync
+
+import (
+ "github.com/apigee-labs/transicator/common"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "sort"
+)
+
+var _ = Describe("data access tests", func() {
+
+ BeforeEach(func() {
+ db := getDB()
+
+ //all tests in this file operate on the api_product table. Create the necessary tables for this here
+ getDB().Exec("CREATE TABLE _transicator_tables " +
+ "(tableName varchar not null, columnName varchar not null, " +
+ "typid integer, primaryKey bool);")
+ getDB().Exec("DELETE from _transicator_tables")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+ getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+ getDB().Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+ "api_resources text,approval_type text,scopes text,proxies text, environments text," +
+ "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+ "primary key (id,tenant_id,created_at,updated_at));")
+ getDB().Exec("DELETE from kms_api_product")
+
+ setDB(db)
+ initDB(db)
+
+ })
+
+ Context("Update processing", func() {
+ It("unit test buildUpdateSql with single primary key", func() {
+ testRow := common.Row{
+ "id": {
+ Value: "ch_api_product_2",
+ },
+ "api_resources": {
+ Value: "{}",
+ },
+ "environments": {
+ Value: "{Env_0, Env_1}",
+ },
+ "tenant_id": {
+ Value: "tenant_id_0",
+ },
+ "_change_selector": {
+ Value: "test_org0",
+ },
+ }
+
+ var orderedColumns []string
+ for column := range testRow {
+ orderedColumns = append(orderedColumns, column)
+ }
+ sort.Strings(orderedColumns)
+
+ result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
+ Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" +
+ " WHERE id=$6").To(Equal(result))
+ })
+
+ It("unit test buildUpdateSql with composite primary key", func() {
+ testRow := common.Row{
+ "id1": {
+ Value: "composite-key-1",
+ },
+ "id2": {
+ Value: "composite-key-2",
+ },
+ "api_resources": {
+ Value: "{}",
+ },
+ "environments": {
+ Value: "{Env_0, Env_1}",
+ },
+ "tenant_id": {
+ Value: "tenant_id_0",
+ },
+ "_change_selector": {
+ Value: "test_org0",
+ },
+ }
+
+ var orderedColumns []string
+ for column := range testRow {
+ orderedColumns = append(orderedColumns, column)
+ }
+ sort.Strings(orderedColumns)
+
+ result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
+ Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" +
+ " WHERE id1=$7 AND id2=$8").To(Equal(result))
+ })
+
+ It("test update with composite primary key", func() {
+ event := &common.ChangeList{}
+
+ //this needs to match what is actually in the DB
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(true).To(Equal(processChangeList(event)))
+ err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ })
+
+ It("update should succeed if newrow modifies the primary key", func() {
+ event := &common.ChangeList{}
+
+ //this needs to match what is actually in the DB
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ newRow := common.Row{
+ "id": {
+ Value: "new_id",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(true).To(Equal(processChangeList(event)))
+ err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
+
+ It("update should succeed if newrow contains fewer fields than oldrow", func() {
+ event := &common.ChangeList{}
+
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(true).To(Equal(processChangeList(event)))
+ err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
+
+ It("update should succeed if oldrow contains fewer fields than newrow", func() {
+ event := &common.ChangeList{}
+
+ oldRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "A product for testing Greg",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ newRow := common.Row{
+ "id": {
+ Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: oldRow,
+ Operation: 1,
+ },
+ }
+ //insert and assert success
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //create update event
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: oldRow,
+ NewRow: newRow,
+ Operation: 2,
+ },
+ }
+
+ //do the update
+ Expect(true).To(Equal(processChangeList(event)))
+ err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+ })
+ })
+
+ Context("Insert processing", func() {
+ It("Properly constructs insert sql for one row", func() {
+ newRow := common.Row{
+ "id": {
+ Value: "new_id",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ var orderedColumns []string
+ for column := range newRow {
+ orderedColumns = append(orderedColumns, column)
+ }
+ sort.Strings(orderedColumns)
+
+ expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)"
+ Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
+ })
+
+ It("Properly constructs insert sql for multiple rows", func() {
+ newRow1 := common.Row{
+ "id": {
+ Value: "1",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+ newRow2 := common.Row{
+ "id": {
+ Value: "2",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ var orderedColumns []string
+ for column := range newRow1 {
+ orderedColumns = append(orderedColumns, column)
+ }
+ sort.Strings(orderedColumns)
+
+ expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)"
+ Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
+ })
+
+ It("Properly executes insert for a single rows", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ }
+
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ })
+
+ It("Properly executed insert for multiple rows", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+ newRow2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: newRow2,
+ Operation: 1,
+ },
+ }
+
+ Expect(true).To(Equal(processChangeList(event)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(2))
+ })
+
+ It("Fails to execute if row does not match existing table schema", func() {
+ event := &common.ChangeList{}
+
+ newRow1 := common.Row{
+ "not_and_id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ }
+
+ ok := processChangeList(event)
+ Expect(false).To(Equal(ok))
+
+ var nRows int
+ //assert that no extraneous rows were added
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(0))
+ })
+
+ It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
+ event := &common.ChangeList{}
+ newRow1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ newRow2 := common.Row{
+ "not_and_id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: newRow1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: newRow2,
+ Operation: 1,
+ },
+ }
+
+ ok := processChangeList(event)
+ Expect(false).To(Equal(ok))
+ })
+ })
+
+ Context("Delete processing", func() {
+ It("Properly constructs sql prepare for Delete", func() {
+ row := common.Row{
+ "id": {
+ Value: "new_id",
+ },
+ "api_resources": {
+ Value: "{/**}",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "43aef41d",
+ },
+ "description": {
+ Value: "new description",
+ },
+ "created_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "updated_at": {
+ Value: "2017-03-01 22:50:41.75+00:00",
+ },
+ "_change_selector": {
+ Value: "43aef41d",
+ },
+ }
+
+ pkeys, err := getPkeysForTable("kms_api_product")
+ Expect(err).Should(Succeed())
+ sql := buildDeleteSql("kms_api_product", row, pkeys)
+ Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
+ })
+
+ It("Verify execute single insert & single delete works", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ }
+
+ Expect(true).To(Equal(processChangeList(event1)))
+ var nRows int
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ Expect(true).To(Equal(processChangeList(event2)))
+
+ // validate delete
+ err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(0))
+
+ // delete again should fail - coz entry will not exist
+ Expect(false).To(Equal(processChangeList(event2)))
+ })
+
+ It("verify multiple insert and single delete works", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ Row2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ {
+ Table: "kms.api_product",
+ NewRow: Row2,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ }
+
+ Expect(true).To(Equal(processChangeList(event1)))
+ var nRows int
+ //verify first row
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //verify second row
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(2))
+
+ Expect(true).To(Equal(processChangeList(event2)))
+
+ //verify second row still exists
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ // validate delete
+ err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ // delete again should fail - coz entry will not exist
+ Expect(false).To(Equal(processChangeList(event2)))
+ }, 3)
+
+ It("verify single insert and multiple delete fails", func() {
+ event1 := &common.ChangeList{}
+ event2 := &common.ChangeList{}
+
+ Row1 := common.Row{
+ "id": {
+ Value: "a",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ Row2 := common.Row{
+ "id": {
+ Value: "b",
+ },
+ "api_resources": {
+ Value: "r",
+ },
+ "environments": {
+ Value: "{test}",
+ },
+ "tenant_id": {
+ Value: "t",
+ },
+ "description": {
+ Value: "d",
+ },
+ "created_at": {
+ Value: "c",
+ },
+ "updated_at": {
+ Value: "u",
+ },
+ "_change_selector": {
+ Value: "cs",
+ },
+ }
+
+ event1.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ NewRow: Row1,
+ Operation: 1,
+ },
+ }
+ event2.Changes = []common.Change{
+ {
+ Table: "kms.api_product",
+ OldRow: Row1,
+ Operation: 3,
+ },
+ {
+ Table: "kms.api_product",
+ OldRow: Row2,
+ Operation: 3,
+ },
+ }
+
+ Expect(true).To(Equal(processChangeList(event1)))
+ var nRows int
+ //verify insert
+ err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+ "and _change_selector='cs'").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ //assert that no extraneous rows were added
+ err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(nRows).To(Equal(1))
+
+ Expect(false).To(Equal(processChangeList(event2)))
+
+ }, 3)
+ })
+})
diff --git a/init.go b/init.go
index cf90781..d9b767e 100644
--- a/init.go
+++ b/init.go
@@ -30,16 +30,16 @@
var (
/* All set during plugin initialization */
- log apid.LogService
- config apid.ConfigService
- dataService apid.DataService
- events apid.EventsService
- apidInfo apidInstanceInfo
- newInstanceID bool
- tokenManager *tokenMan
- changeManager *pollChangeManager
- snapManager *snapShotManager
- httpclient *http.Client
+ log apid.LogService
+ config apid.ConfigService
+ dataService apid.DataService
+ events apid.EventsService
+ apidInfo apidInstanceInfo
+ newInstanceID bool
+ apidTokenManager tokenManager
+ apidChangeManager changeManager
+ apidSnapshotManager snapShotManager
+ httpclient *http.Client
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -62,7 +62,7 @@
func initConfigDefaults() {
config.SetDefault(configPollInterval, 120*time.Second)
- config.SetDefault(configSnapshotProtocol, "json")
+ config.SetDefault(configSnapshotProtocol, "sqlite")
name, errh := os.Hostname()
if (errh != nil) && (len(config.GetString(configName)) == 0) {
log.Errorf("Not able to get hostname for kernel. Please set '%s' property in config", configName)
@@ -83,16 +83,11 @@
Transport: tr,
Timeout: httpTimeout,
CheckRedirect: func(req *http.Request, _ []*http.Request) error {
- req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
return nil
},
}
- //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
- //also useful for testing
- snapManager = createSnapShotManager()
- changeManager = createChangeManager()
-
// set up default database
db, err := dataService.DB()
if err != nil {
@@ -117,6 +112,12 @@
return nil
}
+func createManagers() {
+ apidSnapshotManager = createSnapShotManager()
+ apidChangeManager = createChangeManager()
+ apidTokenManager = createSimpleTokenManager()
+}
+
func checkForRequiredValues() error {
// check for required values
for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
@@ -126,8 +127,8 @@
}
}
proto := config.GetString(configSnapshotProtocol)
- if proto != "json" && proto != "proto" {
- return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
+ if proto != "sqlite" {
+ return fmt.Errorf("Illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
}
return nil
@@ -137,7 +138,7 @@
log = logger
}
-/* Idempotent state initialization */
+/* initialization */
func _initPlugin(services apid.Services) error {
SetLogger(services.Log().ForModule("apigeeSync"))
log.Debug("start init")
@@ -165,6 +166,8 @@
return pluginData, err
}
+ createManagers()
+
/* This callback function will get called once all the plugins are
* initialized (not just this plugin). This is needed because,
* downloadSnapshots/changes etc have to begin to be processed only
@@ -208,8 +211,7 @@
log.Debug("start post plugin init")
- tokenManager = createTokenManager()
-
+ apidTokenManager.start()
go bootstrap()
events.Listen(ApigeeSyncEventSelector, &handler{})
diff --git a/listener.go b/listener.go
index 49e3d6b..2a8b492 100644
--- a/listener.go
+++ b/listener.go
@@ -36,11 +36,7 @@
log.Panicf("Unable to access database: %v", err)
}
- if config.GetString(configSnapshotProtocol) == "json" {
- processJsonSnapshot(snapshot, db)
- } else if config.GetString(configSnapshotProtocol) == "sqlite" {
- processSqliteSnapshot(snapshot, db)
- }
+ processSqliteSnapshot(db)
//update apid instance info
apidInfo.LastSnapshot = snapshot.SnapshotInfo
@@ -54,128 +50,73 @@
}
-func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) {
- //nothing to do as of now, here as a placeholder
-}
+func processSqliteSnapshot(db apid.DB) {
-func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) {
-
- err := initDB(db)
+ var numApidClusters int
+ apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster")
if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
+ log.Panicf("Unable to read database: %s", err.Error())
+ }
+ apidClusters.Next()
+ err = apidClusters.Scan(&numApidClusters)
+ if err != nil {
+ log.Panicf("Unable to read database: %s", err.Error())
}
- tx, err := db.Begin()
- if err != nil {
- log.Panicf("Error starting transaction: %v", err)
+ if numApidClusters != 1 {
+ log.Panic("Illegal state for apid_cluster. Must be a single row.")
}
- defer tx.Rollback()
- for _, table := range snapshot.Tables {
-
- switch table.Name {
- case LISTENER_TABLE_APID_CLUSTER:
- if len(table.Rows) > 1 {
- log.Panic("Illegal state for apid_cluster. Must be a single row.")
- }
- for _, row := range table.Rows {
- ac := makeApidClusterFromRow(row)
- err := insertApidCluster(ac, tx)
- if err != nil {
- log.Panicf("Snapshot update failed: %v", err)
- }
- }
-
- case LISTENER_TABLE_DATA_SCOPE:
- for _, row := range table.Rows {
- ds := makeDataScopeFromRow(row)
- err := insertDataScope(ds, tx)
- if err != nil {
- log.Panicf("Snapshot update failed: %v", err)
- }
- }
+ _, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+ if err != nil {
+ if err.Error() == "duplicate column name: last_sequence" {
+ return
+ } else {
+ log.Error("[[" + err.Error() + "]]")
+ log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err)
}
}
-
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error committing Snapshot change: %v", err)
- }
}
-func processChangeList(changes *common.ChangeList) {
+func processChangeList(changes *common.ChangeList) bool {
+
+ ok := false
tx, err := getDB().Begin()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
+ return ok
}
defer tx.Rollback()
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
for _, change := range changes.Changes {
- switch change.Table {
- case "edgex.apid_cluster":
- switch change.Operation {
- case common.Delete:
- // todo: shut down apid, delete databases, scorch the earth!
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- default:
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- }
- case "edgex.data_scope":
- switch change.Operation {
- case common.Insert:
- ds := makeDataScopeFromRow(change.NewRow)
- err = insertDataScope(ds, tx)
- case common.Delete:
- ds := makeDataScopeFromRow(change.OldRow)
- err = deleteDataScope(ds, tx)
- default:
- // common.Update is not allowed
- log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
- }
+ if change.Table == LISTENER_TABLE_APID_CLUSTER {
+ log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
}
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
+ switch change.Operation {
+ case common.Insert:
+ ok = insert(change.Table, []common.Row{change.NewRow}, tx)
+ case common.Update:
+ if change.Table == LISTENER_TABLE_DATA_SCOPE {
+ log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
+ }
+ ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+ case common.Delete:
+ ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
+ }
+ if !ok {
+ log.Error("Sql Operation error. Operation rollbacked")
+ return ok
}
}
err = tx.Commit()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
+ return false
}
-}
-func makeApidClusterFromRow(row common.Row) dataApidCluster {
-
- dac := dataApidCluster{}
-
- row.Get("id", &dac.ID)
- row.Get("name", &dac.Name)
- row.Get("umbrella_org_app_name", &dac.OrgAppName)
- row.Get("created", &dac.Created)
- row.Get("created_by", &dac.CreatedBy)
- row.Get("updated", &dac.Updated)
- row.Get("updated_by", &dac.UpdatedBy)
- row.Get("description", &dac.Description)
-
- return dac
-}
-
-func makeDataScopeFromRow(row common.Row) dataDataScope {
-
- ds := dataDataScope{}
-
- row.Get("id", &ds.ID)
- row.Get("apid_cluster_id", &ds.ClusterID)
- row.Get("scope", &ds.Scope)
- row.Get("org", &ds.Org)
- row.Get("env", &ds.Env)
- row.Get("created", &ds.Created)
- row.Get("created_by", &ds.CreatedBy)
- row.Get("updated", &ds.Updated)
- row.Get("updated_by", &ds.UpdatedBy)
-
- return ds
+ return ok
}
diff --git a/listener_test.go b/listener_test.go
index b5fea9b..d424a0b 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -5,48 +5,28 @@
. "github.com/onsi/gomega"
"github.com/apigee-labs/transicator/common"
+ "os"
)
var _ = Describe("listener", func() {
handler := handler{}
- var saveLastSnapshot string
+
+ var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
+ initDb(sqlfile, "./mockdb.sqlite3")
+ file, err := os.Open("./mockdb.sqlite3")
+ Expect(err).ShouldNot(HaveOccurred())
+
+ s := common.Snapshot{}
+ err = processSnapshotServerFileResponse(dbId, file, &s)
+ Expect(err).ShouldNot(HaveOccurred())
+ return s
+ }
Context("ApigeeSync snapshot event", func() {
- It("should set DB to appropriate version", func() {
- log.Info("Starting listener tests...")
-
- //save the last snapshot, so we can restore it at the end of this context
- saveLastSnapshot = apidInfo.LastSnapshot
-
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot",
- Tables: []common.Table{},
- }
-
- handler.Handle(&event)
-
- Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo))
-
- expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
-
- Expect(getDB() == expectedDB).Should(BeTrue())
- })
-
It("should fail if more than one apid_cluster rows", func() {
-
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot_fail",
- Tables: []common.Table{
- {
- Name: LISTENER_TABLE_APID_CLUSTER,
- Rows: []common.Row{{}, {}},
- },
- },
- }
-
+ event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
Expect(func() { handler.Handle(&event) }).To(Panic())
}, 3)
@@ -68,74 +48,7 @@
It("should process a valid Snapshot", func() {
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot_valid",
- Tables: []common.Table{
- {
- Name: LISTENER_TABLE_APID_CLUSTER,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "i"},
- "name": &common.ColumnVal{Value: "n"},
- "umbrella_org_app_name": &common.ColumnVal{Value: "o"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "description": &common.ColumnVal{Value: "d"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e1"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e2"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "k"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e3"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- },
- }
+ event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
handler.Handle(&event)
@@ -146,13 +59,18 @@
db := getDB()
+ expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(db == expectedDB).Should(BeTrue())
+
// apid Cluster
var dcs []dataApidCluster
rows, err := db.Query(`
SELECT id, name, description, umbrella_org_app_name,
created, created_by, updated, updated_by
- FROM APID_CLUSTER`)
+ FROM EDGEX_APID_CLUSTER`)
Expect(err).NotTo(HaveOccurred())
defer rows.Close()
@@ -182,7 +100,7 @@
SELECT id, apid_cluster_id, scope, org,
env, created, created_by, updated,
updated_by
- FROM DATA_SCOPE`)
+ FROM EDGEX_DATA_SCOPE`)
Expect(err).NotTo(HaveOccurred())
defer rows.Close()
@@ -219,7 +137,6 @@
Expect(scopes[1]).To(Equal("s2"))
//restore the last snapshot
- apidInfo.LastSnapshot = saveLastSnapshot
}, 3)
})
@@ -228,10 +145,12 @@
Context(LISTENER_TABLE_APID_CLUSTER, func() {
It("insert event should panic", func() {
- //save the last snapshot, so we can restore it at the end of this context
- saveLastSnapshot = apidInfo.LastSnapshot
+ ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
+ handler.Handle(&ssEvent)
- event := common.ChangeList{
+ //save the last snapshot, so we can restore it at the end of this context
+
+ csEvent := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
{
@@ -241,10 +160,12 @@
},
}
- Expect(func() { handler.Handle(&event) }).To(Panic())
+ Expect(func() { handler.Handle(&csEvent) }).To(Panic())
}, 3)
It("update event should panic", func() {
+ ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
+ handler.Handle(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -258,17 +179,15 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
- apidInfo.LastSnapshot = saveLastSnapshot
}, 3)
- PIt("delete event should kill all the things!")
})
Context(LISTENER_TABLE_DATA_SCOPE, func() {
It("insert event should add", func() {
- //save the last snapshot, so we can restore it at the end of this context
- saveLastSnapshot = apidInfo.LastSnapshot
+ ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
+ handler.Handle(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -277,30 +196,32 @@
Operation: common.Insert,
Table: LISTENER_TABLE_DATA_SCOPE,
NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
},
},
{
Operation: common.Insert,
Table: LISTENER_TABLE_DATA_SCOPE,
NewRow: common.Row{
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
+ "id": &common.ColumnVal{Value: "j"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s2"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
},
},
},
@@ -314,7 +235,7 @@
SELECT id, apid_cluster_id, scope, org,
env, created, created_by, updated,
updated_by
- FROM DATA_SCOPE`)
+ FROM EDGEX_DATA_SCOPE`)
Expect(err).NotTo(HaveOccurred())
defer rows.Close()
@@ -326,6 +247,7 @@
dds = append(dds, d)
}
+ //three already existing
Expect(len(dds)).To(Equal(2))
ds := dds[0]
@@ -349,6 +271,8 @@
}, 3)
It("delete event should delete", func() {
+ ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
+ handler.Handle(&ssEvent)
insert := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -356,15 +280,16 @@
Operation: common.Insert,
Table: LISTENER_TABLE_DATA_SCOPE,
NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
},
},
},
@@ -386,13 +311,15 @@
handler.Handle(&delete)
var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM DATA_SCOPE").Scan(&nRows)
+ err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
- Expect(nRows).To(Equal(0))
+ Expect(0).To(Equal(nRows))
}, 3)
- It("update event should panic", func() {
+ It("update event should panic for data scopes table", func() {
+ ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
+ handler.Handle(&ssEvent)
event := common.ChangeList{
LastSequence: "test",
@@ -406,10 +333,9 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
- apidInfo.LastSnapshot = saveLastSnapshot
}, 3)
+ //TODO add tests for update/insert/delete cluster
})
-
})
})
diff --git a/managerInterfaces.go b/managerInterfaces.go
new file mode 100644
index 0000000..20bbf6f
--- /dev/null
+++ b/managerInterfaces.go
@@ -0,0 +1,29 @@
+package apidApigeeSync
+
+import (
+ "github.com/apigee-labs/transicator/common"
+ "net/url"
+)
+
+type tokenManager interface {
+ getBearerToken() string
+ invalidateToken() error
+ getToken() *oauthToken
+ close()
+ getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
+ start()
+}
+
+type snapShotManager interface {
+ close() <-chan bool
+ downloadBootSnapshot()
+ storeBootSnapshot(snapshot *common.Snapshot)
+ downloadDataSnapshot()
+ storeDataSnapshot(snapshot *common.Snapshot)
+ downloadSnapshot(scopes []string, snapshot *common.Snapshot) error
+}
+
+type changeManager interface {
+ close() <-chan bool
+ pollChangeWithBackoff()
+}
diff --git a/mock_server.go b/mock_server.go
index f7b3600..8349131 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -8,17 +8,20 @@
"math/rand"
"net/http"
"net/url"
- "strconv"
"sync"
"sync/atomic"
- "time"
"net"
+ "database/sql"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "io"
+ "io/ioutil"
+ "os"
+ "strconv"
)
/*
@@ -41,19 +44,16 @@
const oauthExpiresIn = 2 * 60 // 2 minutes
type MockParms struct {
- ReliableAPI bool
- ClusterID string
- TokenKey string
- TokenSecret string
- Scope string
- Organization string
- Environment string
- NumDevelopers int
- AddDeveloperEvery time.Duration
- UpdateDeveloperEvery time.Duration
- NumDeployments int
- ReplaceDeploymentEvery time.Duration
- BundleURI string
+ ReliableAPI bool
+ ClusterID string
+ TokenKey string
+ TokenSecret string
+ Scope string
+ Organization string
+ Environment string
+ NumDevelopers int
+ NumDeployments int
+ BundleURI string
}
func Mock(params MockParms, router apid.Router) *MockServer {
@@ -72,7 +72,6 @@
params MockParms
oauthToken string
snapshotID string
- snapshotTables map[string][]common.Table // key = scopeID
changeChannel chan []byte
sequenceID *int64
maxDevID *int64
@@ -80,10 +79,27 @@
minDeploymentID *int64
maxDeploymentID *int64
newSnap *int32
+ authFail *int32
+}
+
+func (m *MockServer) forceAuthFail() {
+ atomic.StoreInt32(m.authFail, 1)
+}
+
+func (m *MockServer) normalAuthCheck() {
+ atomic.StoreInt32(m.authFail, 0)
+}
+
+func (m *MockServer) passAuthCheck() {
+ atomic.StoreInt32(m.authFail, 2)
}
func (m *MockServer) forceNewSnapshot() {
- atomic.SwapInt32(m.newSnap, 1)
+ atomic.StoreInt32(m.newSnap, 1)
+}
+
+func (m *MockServer) forceNoSnapshot() {
+ atomic.StoreInt32(m.newSnap, 0)
}
func (m *MockServer) lastSequenceID() string {
@@ -111,6 +127,28 @@
return strconv.FormatInt(newMinID-1, 10)
}
+func initDb(statements, path string) {
+
+ f, _ := os.Create(path)
+ f.Close()
+
+ db, err := sql.Open("sqlite3", path)
+ if err != nil {
+ log.Panic("Could not instantiate mock db, %s", err)
+ }
+ defer db.Close()
+ sqlStatementsBuffer, err := ioutil.ReadFile(statements)
+ if err != nil {
+ log.Panic("Could not instantiate mock db, %s", err)
+ }
+ sqlStatementsString := string(sqlStatementsBuffer)
+ _, err = db.Exec(sqlStatementsString)
+ if err != nil {
+ log.Panic("Could not instantiate mock db, %s", err)
+ }
+
+}
+
func (m *MockServer) init() {
defer GinkgoRecover()
RegisterFailHandler(func(message string, callerSkip ...int) {
@@ -125,79 +163,12 @@
*m.minDeploymentID = 1
m.maxDeploymentID = new(int64)
m.newSnap = new(int32)
+ m.authFail = new(int32)
+ *m.authFail = 0
- go m.developerGenerator()
- go m.developerUpdater()
- go m.deploymentReplacer()
+ initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
+ initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
- // cluster "scope"
- cluster := m.newRow(map[string]string{
- "id": m.params.ClusterID,
- "_change_selector": m.params.ClusterID,
- })
-
- // data scopes
- var dataScopes []common.Row
- dataScopes = append(dataScopes, cluster)
- dataScopes = append(dataScopes, m.newRow(map[string]string{
- "id": m.params.Scope,
- "scope": m.params.Scope,
- "org": m.params.Organization,
- "env": m.params.Environment,
- "apid_cluster_id": m.params.ClusterID,
- "_change_selector": m.params.Scope,
- }))
-
- // cluster & data_scope snapshot tables
- m.snapshotTables = map[string][]common.Table{}
- m.snapshotTables[m.params.ClusterID] = []common.Table{
- {
- Name: "edgex.apid_cluster",
- Rows: []common.Row{cluster},
- },
- {
- Name: "edgex.data_scope",
- Rows: dataScopes,
- },
- }
-
- var snapshotTableRows []tableRowMap
-
- // generate one company
- companyID := m.params.Organization
- tenantID := m.params.Scope
- changeSelector := m.params.Scope
- company := tableRowMap{
- "kms.company": m.newRow(map[string]string{
- "id": companyID,
- "status": "Active",
- "tenant_id": tenantID,
- "name": companyID,
- "display_name": companyID,
- "_change_selector": changeSelector,
- }),
- }
- snapshotTableRows = append(snapshotTableRows, company)
-
- // generate snapshot developers
- for i := 0; i < m.params.NumDevelopers; i++ {
- developer := m.createDeveloperWithProductAndApp()
- snapshotTableRows = append(snapshotTableRows, developer)
- }
- log.Infof("created %d developers", m.params.NumDevelopers)
-
- // generate snapshot deployments
- for i := 0; i < m.params.NumDeployments; i++ {
- deployment := m.createDeployment()
- snapshotTableRows = append(snapshotTableRows, deployment)
- }
- log.Infof("created %d deployments", m.params.NumDeployments)
-
- m.snapshotTables[m.params.Scope] = m.concatTableRowMaps(snapshotTableRows...)
-
- if m.params.NumDevelopers < 10 && m.params.NumDeployments < 10 {
- log.Debugf("snapshotTables: %v", m.snapshotTables[m.params.Scope])
- }
}
// developer, product, application, credential will have the same ID (developerID)
@@ -281,28 +252,19 @@
Expect(scopes).To(ContainElement(m.params.ClusterID))
- m.snapshotID = generateUUID()
- snapshot := &common.Snapshot{
- SnapshotInfo: m.snapshotID,
+ w.Header().Set("Transicator-Snapshot-TXID", generateUUID())
+
+ if len(scopes) == 1 {
+ //send bootstrap db
+ err := streamFile("./mockdb_boot.sqlite3", w)
+ Expect(err).NotTo(HaveOccurred())
+ return
+ } else {
+ //send data db
+ err := streamFile("./mockdb.sqlite3", w)
+ Expect(err).NotTo(HaveOccurred())
+ return
}
-
- // Note: if/when we support multiple scopes, we'd have to do a merge of table rows
- for _, scope := range scopes {
- tables := m.snapshotTables[scope]
- for _, table := range tables {
- snapshot.AddTables(table)
- }
- }
-
- body, err := json.Marshal(snapshot)
- Expect(err).NotTo(HaveOccurred())
-
- log.Infof("sending snapshot: %s", m.snapshotID)
- if len(body) < 10000 {
- log.Debugf("snapshot: %#v", string(body))
- }
-
- w.Write(body)
}
func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
@@ -310,6 +272,7 @@
val := atomic.SwapInt32(m.newSnap, 0)
if val > 0 {
+ log.Debug("MockServer: force new snapshot")
w.WriteHeader(http.StatusBadRequest)
apiErr := changeServerError{
Code: "SNAPSHOT_TOO_OLD",
@@ -320,12 +283,14 @@
return
}
+ log.Debug("mock server sending change list")
+
q := req.URL.Query()
scopes := q["scope"]
- block, err := strconv.Atoi(q.Get("block"))
+ _, err := strconv.Atoi(q.Get("block"))
Expect(err).NotTo(HaveOccurred())
- since := q.Get("since")
+ _ = q.Get("since")
Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
//Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
@@ -333,11 +298,6 @@
Expect(scopes).To(ContainElement(m.params.ClusterID))
//Expect(scopes).To(ContainElement(m.params.Scope))
- if since != "" {
- m.sendChange(w, time.Duration(block)*time.Second)
- return
- }
-
// todo: the following is just legacy for the existing test in apigeeSync_suite_test
developer := m.createDeveloperWithProductAndApp()
changeList := m.createInsertChange(developer)
@@ -348,93 +308,6 @@
w.Write(body)
}
-// generate developers w/ product and app
-func (m *MockServer) developerGenerator() {
-
- for range time.Tick(m.params.AddDeveloperEvery) {
-
- developer := m.createDeveloperWithProductAndApp()
- changeList := m.createInsertChange(developer)
-
- body, err := json.Marshal(changeList)
- if err != nil {
- log.Errorf("Error adding developer: %v", err)
- }
-
- log.Info("adding developer")
- log.Debugf("body: %#v", string(body))
- m.changeChannel <- body
- }
-}
-
-// update random developers - set username
-func (m *MockServer) developerUpdater() {
-
- for range time.Tick(m.params.UpdateDeveloperEvery) {
-
- developerID := m.randomDeveloperID()
-
- oldDev := m.createDeveloper(developerID)
- delete(oldDev, "kms.company_developer")
- newDev := m.createDeveloper(developerID)
- delete(newDev, "kms.company_developer")
-
- newRow := newDev["kms.developer"]
- newRow["username"] = m.stringColumnVal("i_am_not_a_number")
-
- changeList := m.createUpdateChange(oldDev, newDev)
-
- body, err := json.Marshal(changeList)
- if err != nil {
- log.Errorf("Error updating developer: %v", err)
- }
-
- log.Info("updating developer")
- log.Debugf("body: %#v", string(body))
- m.changeChannel <- body
- }
-}
-
-func (m *MockServer) deploymentReplacer() {
-
- for range time.Tick(m.params.ReplaceDeploymentEvery) {
-
- // delete
- oldDep := tableRowMap{}
- oldDep["edgex.deployment"] = m.newRow(map[string]string{
- "id": m.popDeploymentID(),
- })
- deleteChange := m.createDeleteChange(oldDep)
-
- // insert
- newDep := m.createDeployment()
- insertChange := m.createInsertChange(newDep)
-
- changeList := m.concatChangeLists(deleteChange, insertChange)
-
- body, err := json.Marshal(changeList)
- if err != nil {
- log.Errorf("Error replacing deployment: %v", err)
- }
-
- log.Info("replacing deployment")
- log.Debugf("body: %#v", string(body))
- m.changeChannel <- body
- }
-}
-
-// todo: we could debounce this if necessary
-func (m *MockServer) sendChange(w http.ResponseWriter, timeout time.Duration) {
- select {
- case change := <-m.changeChannel:
- log.Info("sending change to client")
- w.Write(change)
- case <-time.After(timeout):
- log.Info("change request timeout")
- w.WriteHeader(http.StatusNotModified)
- }
-}
-
// enables GoMega handling
func (m *MockServer) gomega(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
@@ -452,15 +325,29 @@
// enforces handler auth
func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
- auth := req.Header.Get("Authorization")
+ // force failing auth check
+ if atomic.LoadInt32(m.authFail) == 1 {
+ w.WriteHeader(http.StatusUnauthorized)
+ w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
+ return
+ }
+
+ // force passing auth check
+ if atomic.LoadInt32(m.authFail) == 2 {
+ target(w, req)
+ return
+ }
+
+ // check auth header
+ auth := req.Header.Get("Authorization")
expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
if auth != expectedAuth {
- w.WriteHeader(http.StatusBadRequest)
+ w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
- } else {
- target(w, req)
+ return
}
+ target(w, req)
}
}
@@ -537,7 +424,7 @@
Expect(err).ShouldNot(HaveOccurred())
rows := tableRowMap{}
- rows["edgex.deployment"] = m.newRow(map[string]string{
+ rows["kms_deployment"] = m.newRow(map[string]string{
"id": deploymentID,
"bundle_config_id": bundleID,
"apid_cluster_id": m.params.ClusterID,
@@ -556,15 +443,14 @@
rows := tableRowMap{}
- rows["kms.developer"] = m.newRow(map[string]string{
+ rows["kms_developer"] = m.newRow(map[string]string{
"id": developerID,
"status": "Active",
"tenant_id": tenantID,
})
// map developer onto to existing company
- rows["kms.company_developer"] = m.newRow(map[string]string{
- "id": developerID,
+ rows["kms_company_developer"] = m.newRow(map[string]string{
"tenant_id": tenantID,
"company_id": companyID,
"developer_id": developerID,
@@ -581,7 +467,7 @@
resources := fmt.Sprintf("{%s}", "/") // todo: what should be here?
rows := tableRowMap{}
- rows["kms.api_product"] = m.newRow(map[string]string{
+ rows["kms_api_product"] = m.newRow(map[string]string{
"id": productID,
"api_resources": resources,
"environments": environments,
@@ -596,21 +482,21 @@
rows := tableRowMap{}
- rows["kms.app"] = m.newRow(map[string]string{
+ rows["kms_app"] = m.newRow(map[string]string{
"id": applicationID,
"developer_id": developerID,
"status": "Approved",
"tenant_id": tenantID,
})
- rows["kms.app_credential"] = m.newRow(map[string]string{
+ rows["kms_app_credential"] = m.newRow(map[string]string{
"id": credentialID,
"app_id": applicationID,
"tenant_id": tenantID,
"status": "Approved",
})
- rows["kms.app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
+ rows["kms_app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
"apiprdt_id": productID,
"app_id": applicationID,
"appcred_id": credentialID,
@@ -688,26 +574,6 @@
}
// create []common.Table from array of tableRowMaps
-func (m *MockServer) concatTableRowMaps(maps ...tableRowMap) []common.Table {
- tableMap := map[string]*common.Table{}
- for _, m := range maps {
- for name, row := range m {
- if _, ok := tableMap[name]; !ok {
- tableMap[name] = &common.Table{
- Name: name,
- }
- }
- tableMap[name].AddRowstoTable(row)
- }
- }
- result := []common.Table{}
- for _, v := range tableMap {
- result = append(result, *v)
- }
- return result
-}
-
-// create []common.Table from array of tableRowMaps
func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
result := common.ChangeList{}
if len(changeLists) > 0 {
@@ -721,3 +587,16 @@
}
return result
}
+
+func streamFile(srcFile string, w http.ResponseWriter) error {
+ inFile, err := os.Open(srcFile)
+ if err != nil {
+ return err
+ }
+ defer inFile.Close()
+
+ w.Header().Set("Content-Type", "application/transicator+sqlite")
+
+ _, err = io.Copy(w, inFile)
+ return err
+}
diff --git a/snapshot.go b/snapshot.go
index 3b288a2..f4cb8bd 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -1,7 +1,6 @@
package apidApigeeSync
import (
- "encoding/json"
"github.com/30x/apid-core"
"github.com/30x/apid-core/data"
"github.com/apigee-labs/transicator/common"
@@ -16,7 +15,7 @@
"time"
)
-type snapShotManager struct {
+type simpleSnapShotManager struct {
// to send quit signal to the downloading thread
quitChan chan bool
// to mark the graceful close of snapshotManager
@@ -27,10 +26,10 @@
isDownloading *int32
}
-func createSnapShotManager() *snapShotManager {
+func createSnapShotManager() *simpleSnapShotManager {
isClosedInt := int32(0)
isDownloadingInt := int32(0)
- return &snapShotManager{
+ return &simpleSnapShotManager{
quitChan: make(chan bool, 1),
finishChan: make(chan bool, 1),
isClosed: &isClosedInt,
@@ -44,7 +43,7 @@
* use <- close() for blocking close
* should only be called by pollChangeManager, because pollChangeManager is dependent on it
*/
-func (s *snapShotManager) close() <-chan bool {
+func (s *simpleSnapShotManager) close() <-chan bool {
//has been closed before
if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
log.Error("snapShotManager: close() called on a closed snapShotManager!")
@@ -64,7 +63,7 @@
}
// retrieve boot information: apid_config and apid_config_scope
-func (s *snapShotManager) downloadBootSnapshot() {
+func (s *simpleSnapShotManager) downloadBootSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
}
@@ -100,12 +99,12 @@
s.storeBootSnapshot(snapshot)
}
-func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
processSnapshot(snapshot)
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *snapShotManager) downloadDataSnapshot() {
+func (s *simpleSnapShotManager) downloadDataSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
}
@@ -133,21 +132,14 @@
s.storeDataSnapshot(snapshot)
}
-func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
knownTables = extractTablesFromSnapshot(snapshot)
- db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+ _, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Database inaccessible: %v", err)
}
- // if closed
- if atomic.LoadInt32(s.isClosed) == int32(1) {
- log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager")
- return
- }
- persistKnownTablesToDB(knownTables, db)
-
log.Info("Emitting Snapshot to plugins")
select {
@@ -167,14 +159,31 @@
log.Debug("Extracting table names from snapshot")
if snapshot.Tables == nil {
//if this panic ever fires, it's a bug
- log.Panicf("Attempt to extract known tables from snapshot without tables failed")
- }
+ db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Database inaccessible: %v", err)
+ }
+ rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
+ if err != nil {
+ log.Panicf("Unable to read in known snapshot tables from sqlite file")
+ }
+ for rows.Next() {
+ var tableName string
+ rows.Scan(&tableName)
+ if err != nil {
+ log.Panic("Error scaning tableNames from _transicator_tables")
+ }
+ tables[tableName] = true
+ }
- for _, table := range snapshot.Tables {
- tables[table.Name] = true
- }
+ } else {
+ for _, table := range snapshot.Tables {
+ tables[table.Name] = true
+ }
+ }
return tables
+
}
func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
@@ -182,7 +191,7 @@
tables = make(map[string]bool)
log.Debug("Extracting table names from existing DB")
- rows, err := db.Query("SELECT name FROM _known_tables;")
+ rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
defer rows.Close()
if err != nil {
@@ -223,7 +232,7 @@
// a blocking method
// will keep retrying with backoff until success
-func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+func (s *simpleSnapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
// if closed
if atomic.LoadInt32(s.isClosed) == int32(1) {
log.Warn("Trying to download snapshot with a closed snapShotManager")
@@ -263,16 +272,14 @@
}
addHeaders(req)
- var processSnapshotResponse func(*http.Response, *common.Snapshot) error
+ var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error
- // Set the transport protocol type based on conf file input
- if config.GetString(configSnapshotProtocol) == "json" {
- req.Header.Set("Accept", "application/json")
- processSnapshotResponse = processSnapshotServerJsonResponse
- } else if config.GetString(configSnapshotProtocol) == "sqlite" {
- req.Header.Set("Accept", "application/transicator+sqlite")
- processSnapshotResponse = processSnapshotServerFileResponse
+ if config.GetString(configSnapshotProtocol) != "sqlite" {
+ log.Panic("Only currently supported snashot protocol is sqlite")
+
}
+ req.Header.Set("Accept", "application/transicator+sqlite")
+ processSnapshotResponse = processSnapshotServerFileResponse
// Issue the request to the snapshot server
r, err := httpclient.Do(req)
@@ -290,7 +297,7 @@
}
// Decode the Snapshot server response
- err = processSnapshotResponse(r, snapshot)
+ err = processSnapshotResponse(r.Header.Get("Transicator-Snapshot-TXID"), r.Body, snapshot)
if err != nil {
log.Errorf("Snapshot server response Data not parsable: %v", err)
return err
@@ -300,50 +307,24 @@
}
}
-func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
- log.Debugf("Inserting table names found in snapshot into db")
+func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
+ dbPath := data.DBPath("common/" + dbId)
+ log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
- tx, err := db.Begin()
+ //this path includes the sqlite3 file name. why does mkdir all stop at parent??
+ log.Infof("Creating directory with mkdirall %s", dbPath)
+ err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700)
if err != nil {
- log.Panicf("Error starting transaction: %v", err)
+ log.Errorf("Error creating db path %s", err)
}
- defer tx.Rollback()
-
- _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
- if err != nil {
- log.Panicf("Could not create _known_tables table: %s", err)
- }
-
- for name := range tables {
- log.Debugf("Inserting %s into _known_tables", name)
- _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
- if err != nil {
- log.Panicf("Error encountered inserting into known tables ", err)
- }
-
- }
-
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error committing transaction: %v", err)
-
- }
-}
-
-func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error {
- return json.NewDecoder(r.Body).Decode(snapshot)
-}
-
-func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error {
- dbId := r.Header.Get("Transicator-Snapshot-TXID")
- out, err := os.Create(data.DBPath(dbId))
+ out, err := os.Create(dbPath)
if err != nil {
return err
}
defer out.Close()
//stream respose to DB
- _, err = io.Copy(out, r.Body)
+ _, err = io.Copy(out, body)
if err != nil {
return err
diff --git a/sql/init_listener_test_duplicate_apids.sql b/sql/init_listener_test_duplicate_apids.sql
new file mode 100644
index 0000000..21bd615
--- /dev/null
+++ b/sql/init_listener_test_duplicate_apids.sql
@@ -0,0 +1,37 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap2','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+COMMIT;
diff --git a/sql/init_listener_test_no_datascopes.sql b/sql/init_listener_test_no_datascopes.sql
new file mode 100644
index 0000000..6f5030a
--- /dev/null
+++ b/sql/init_listener_test_no_datascopes.sql
@@ -0,0 +1,37 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i');
+
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+
+COMMIT;
diff --git a/sql/init_listener_test_valid_snapshot.sql b/sql/init_listener_test_valid_snapshot.sql
new file mode 100644
index 0000000..4e7e7e5
--- /dev/null
+++ b/sql/init_listener_test_valid_snapshot.sql
@@ -0,0 +1,40 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i');
+
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e1','c','c','u','u','a');
+INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e2','c','c','u','u','a');
+INSERT INTO "edgex_data_scope" VALUES('k','a','s2','o','e3','c','c','u','u','a');
+
+COMMIT;
diff --git a/sql/init_mock_boot_db.sql b/sql/init_mock_boot_db.sql
new file mode 100644
index 0000000..ed66799
--- /dev/null
+++ b/sql/init_mock_boot_db.sql
@@ -0,0 +1,36 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+COMMIT;
diff --git a/sql/init_mock_db.sql b/sql/init_mock_db.sql
new file mode 100644
index 0000000..dc14fda
--- /dev/null
+++ b/sql/init_mock_db.sql
@@ -0,0 +1,246 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','data_scope_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','entity_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','cust_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','org_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','dev_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','comp_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','apiprdt_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','app_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','appcred_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','type',19701,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','value',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','status',19564,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','customer_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','ext_ref_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('configuration','body',25,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','consumer_secret',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','method_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','issued_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','expires_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','scopes',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','company_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','developer_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','roles',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','deployment_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','action',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','apid_cluster_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','data_scope_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','access_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','callback_url',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','app_family',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','company_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','developer_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','parent_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','type',19625,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','appcred_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','app_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','apiprdt_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','status',19670,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','username',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','first_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','last_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','password',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','email',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','status',19564,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','encrypted_password',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','salt',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','_change_selector',1043,0);
+CREATE TABLE "kms_deployment" (id text,bundle_config_id text,apid_cluster_id text,data_scope_id text,bundle_config_name text,bundle_config_json text,config_json text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,bundle_config_id,apid_cluster_id,data_scope_id,_change_selector));
+INSERT INTO "kms_deployment" VALUES('321e443b-9db9-4043-b987-1599e0cdd029','1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85','bootstrap','dataScope1','gcp-test-bundle','{"id":"1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85","created":"2017-02-27T07:40:57.810Z","createdBy":"fierrom@google.com","updated":"2017-02-27T07:40:57.810Z","updatedBy":"fierrom@google.com","name":"gcp-test-bundle","uri":"https://gist.github.com/mdobson/f9d537c5192a660f692affc294266df2/archive/234c7cbf227d769278bee9b06ace51d6062fe96b.zip","checksumType":"md5","checksum":"06fde116f0270b3734a48653d0cfb495"}','{}','2017-02-27 07:41:33.888+00:00','fierrom@google.com','2017-02-27 07:41:33.888+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at));
+INSERT INTO "kms_api_product" VALUES('f5f07319-5104-471c-9df3-64b1842dbe00','43aef41d','test','test','','{/}','AUTO','{""}','{helloworld}','{test}','','',NULL,'2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','43aef41d');
+INSERT INTO "kms_api_product" VALUES('87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','43aef41d','GregProduct','Greg''s Product','A product for testing Greg','{/**}','AUTO','{""}','{}','{test}','','',NULL,'2017-03-01 22:50:41.75+00:00','greg@google.com','2017-03-01 22:50:41.75+00:00','greg@google.com','43aef41d');
+CREATE TABLE attributes (tenant_id text,entity_id text,cust_id text,org_id text,dev_id text,comp_id text,apiprdt_id text,app_id text,appcred_id text,name text,type text,value text,_change_selector text, primary key (tenant_id,tenant_id,entity_id,entity_id,name,type,type));
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isSmbOrganization','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.mgmtGroup','ORGANIZATION','management-edgex','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isEdgexEnabled','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isCpsEnabled','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','access','APIPRODUCT','internal','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','test','APIPRODUCT','v1','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','DisplayName','APP','MitchTestApp','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','Notes','APP','','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','DisplayName','APP','MitchTestApp2','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','Notes','APP','','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','','','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','access','APIPRODUCT','public','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','DisplayName','APP','Greg''s Test App','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','Notes','APP','','43aef41d');
+CREATE TABLE "kms_company" (id text,tenant_id text,name text,display_name text,status text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,created_at,updated_at));
+CREATE TABLE "kms_organization" (id text,name text,display_name text,type text,tenant_id text,customer_id text,description text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,customer_id,created_at,updated_at));
+INSERT INTO "kms_organization" VALUES('ff0b5496-c674-4531-9443-ace334504f59','edgex_gcp1','edgex_gcp1','paid','43aef41d','307eadd7-c6d7-4ec1-b433-59bcd22cd06d','','2017-02-25 00:17:58.159+00:00','vbhangale@apigee.com','2017-02-25 00:18:14.729+00:00','vbhangale@apigee.com','43aef41d');
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+CREATE TABLE "kms_app_credential" (id text,tenant_id text,consumer_secret text,app_id text,method_type text,status text,issued_at blob,expires_at blob,app_status text,scopes text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,app_id,issued_at,expires_at));
+INSERT INTO "kms_app_credential" VALUES('xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','43aef41d','lscGO3lfs3zh8iQ','87c20a31-a504-4ed5-89a5-700adfbb0142','','APPROVED','2017-02-27 07:45:22.774+00:00','','','{}','2017-02-27 07:45:22.774+00:00','-NA-','2017-02-27 07:45:22.877+00:00','-NA-','43aef41d');
+INSERT INTO "kms_app_credential" VALUES('ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','43aef41d','5EBOSSQrLOLO9siN','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','APPROVED','2017-02-27 07:43:23.263+00:00','','','{}','2017-02-27 07:43:23.263+00:00','-NA-','2017-02-27 07:48:16.717+00:00','-NA-','43aef41d');
+INSERT INTO "kms_app_credential" VALUES('DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','43aef41d','MTfK8xscShhnops','be902c0c-c54b-4a65-85d6-358ff8639586','','APPROVED','2017-03-01 22:52:28.019+00:00','','','{}','2017-03-01 22:52:28.019+00:00','-NA-','2017-03-01 22:52:28.022+00:00','-NA-','43aef41d');
+CREATE TABLE "kms_company_developer" (tenant_id text,company_id text,developer_id text,roles text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (tenant_id,company_id,developer_id));
+CREATE TABLE "kms_app" (id text,tenant_id text,name text,display_name text,access_type text,callback_url text,status text,app_family text,company_id text,developer_id text,parent_id text,type text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,name,app_family,parent_id,created_at,updated_at));
+INSERT INTO "kms_app" VALUES('87c20a31-a504-4ed5-89a5-700adfbb0142','43aef41d','MitchTestApp2','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:45:21.586+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:45:21.586+00:00','fierrom@google.com' ||
+ '','43aef41d');
+INSERT INTO "kms_app" VALUES('8f5c9b86-0783-439c-b8e6-7ab9549e30e8','43aef41d','MitchTestApp','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:43:22.301+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:48:18.964+00:00','fierrom@google.com' ||
+ '','43aef41d');
+INSERT INTO "kms_app" VALUES('be902c0c-c54b-4a65-85d6-358ff8639586','43aef41d','GregTestApp','','','','APPROVED','default','','046974c2-9ae5-4452-a42f-bb6657e6cdbe','046974c2-9ae5-4452-a42f-bb6657e6cdbe','DEVELOPER','2017-03-01 22:52:27.615+00:00','greg@google.com','2017-03-01 22:52:27.615+00:00','greg@google.com','43aef41d');
+CREATE TABLE "kms_app_credential_apiproduct_mapper" (tenant_id text,appcred_id text,app_id text,apiprdt_id text,status text,_change_selector text, primary key (tenant_id,appcred_id,app_id,apiprdt_id));
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d');
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','87c20a31-a504-4ed5-89a5-700adfbb0142','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d');
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','be902c0c-c54b-4a65-85d6-358ff8639586','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','APPROVED','43aef41d');
+CREATE TABLE "kms_developer" (id text,tenant_id text,username text,first_name text,last_name text,password text,email text,status text,encrypted_password text,salt text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,username,email,created_at,updated_at));
+INSERT INTO "kms_developer" VALUES('8a350848-0aba-4dcc-aa60-97903efb42ef','43aef41d','mitchfierro','Mitch','Fierro','','fierrom@google.com','ACTIVE','','','2017-02-27 07:43:00.281+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:43:00.281+00:00','fierrom@google.com' ||
+ '','43aef41d');
+INSERT INTO "kms_developer" VALUES('6ab21170-6bac-481d-9be1-9fda02bdd1da','43aef41d','adikgcp','gcp','dev','','adikancherla@gcp.com','ACTIVE','','','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','43aef41d');
+INSERT INTO "kms_developer" VALUES('046974c2-9ae5-4452-a42f-bb6657e6cdbe','43aef41d','gregbrail','Greg','Brail','','gregbrail@google.com','ACTIVE','','','2017-03-01 22:51:40.602+00:00','greg@google.com','2017-03-01 22:51:40.602+00:00','greg@google.com','43aef41d');
+COMMIT;
\ No newline at end of file
diff --git a/token.go b/token.go
index 424c8d4..56d6676 100644
--- a/token.go
+++ b/token.go
@@ -3,6 +3,7 @@
import (
"bytes"
"encoding/json"
+ "errors"
"io/ioutil"
"net/http"
"net/url"
@@ -24,10 +25,10 @@
man.close()
*/
-func createTokenManager() *tokenMan {
+func createSimpleTokenManager() *simpleTokenManager {
isClosedInt := int32(0)
- t := &tokenMan{
+ t := &simpleTokenManager{
quitPollingForToken: make(chan bool, 1),
closed: make(chan bool),
getTokenChan: make(chan bool),
@@ -36,14 +37,10 @@
invalidateDone: make(chan bool),
isClosed: &isClosedInt,
}
-
- t.retrieveNewToken()
- t.refreshTimer = time.After(t.token.refreshIn())
- go t.maintainToken()
return t
}
-type tokenMan struct {
+type simpleTokenManager struct {
token *oauthToken
isClosed *int32
quitPollingForToken chan bool
@@ -55,11 +52,17 @@
invalidateDone chan bool
}
-func (t *tokenMan) getBearerToken() string {
+func (t *simpleTokenManager) start() {
+ t.retrieveNewToken()
+ t.refreshTimer = time.After(t.token.refreshIn())
+ go t.maintainToken()
+}
+
+func (t *simpleTokenManager) getBearerToken() string {
return t.getToken().AccessToken
}
-func (t *tokenMan) maintainToken() {
+func (t *simpleTokenManager) maintainToken() {
for {
select {
case <-t.closed:
@@ -80,18 +83,19 @@
}
// will block until valid
-func (t *tokenMan) invalidateToken() {
+func (t *simpleTokenManager) invalidateToken() error {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
- return
+ return errors.New("invalidateToken() called on closed tokenManager")
}
log.Debug("invalidating token")
t.invalidateTokenChan <- true
<-t.invalidateDone
+ return nil
}
-func (t *tokenMan) getToken() *oauthToken {
+func (t *simpleTokenManager) getToken() *oauthToken {
//has been closed
if atomic.LoadInt32(t.isClosed) == int32(1) {
log.Debug("TokenManager: getToken() called on closed tokenManager")
@@ -105,7 +109,7 @@
* blocking close() of tokenMan
*/
-func (t *tokenMan) close() {
+func (t *simpleTokenManager) close() {
//has been closed
if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
log.Panic("TokenManager: close() has been called before!")
@@ -120,7 +124,7 @@
}
// don't call externally. will block until success.
-func (t *tokenMan) retrieveNewToken() {
+func (t *simpleTokenManager) retrieveNewToken() {
log.Debug("Getting OAuth token...")
uriString := config.GetString(configProxyServerBaseURI)
@@ -133,7 +137,7 @@
pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
}
-func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
diff --git a/token_test.go b/token_test.go
index c8ec7ba..22ff425 100644
--- a/token_test.go
+++ b/token_test.go
@@ -80,7 +80,8 @@
w.Write(body)
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -108,7 +109,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
@@ -147,8 +149,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
-
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
testedTokenManager.getToken()
<-finished
@@ -188,8 +190,8 @@
}))
config.Set(configProxyServerBaseURI, ts.URL)
- testedTokenManager := createTokenManager()
-
+ testedTokenManager := createSimpleTokenManager()
+ testedTokenManager.start()
testedTokenManager.getToken()
testedTokenManager.invalidateToken()
testedTokenManager.getToken()