Implement generic replication and sqlite snapshots
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 9e6ed94..c00de6e 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -24,6 +24,7 @@ ) const dummyConfigValue string = "placeholder" +const expectedClusterId = "bootstrap" var _ = BeforeSuite(func() { wipeDBAferTest = true @@ -42,11 +43,11 @@ config.Set(configProxyServerBaseURI, dummyConfigValue) config.Set(configSnapServerBaseURI, dummyConfigValue) config.Set(configChangeServerBaseURI, dummyConfigValue) - config.Set(configSnapshotProtocol, "json") + config.Set(configSnapshotProtocol, "sqlite") config.Set(configPollInterval, 10*time.Millisecond) config.Set(configName, "testhost") - config.Set(configApidClusterId, "bootstrap") + config.Set(configApidClusterId, expectedClusterId) config.Set(configConsumerKey, "XXXXXXX") config.Set(configConsumerSecret, "YYYYYYY") @@ -63,11 +64,6 @@ lastSequence = "" if wipeDBAferTest { - _, err := getDB().Exec("DELETE FROM APID_CLUSTER") - Expect(err).NotTo(HaveOccurred()) - _, err = getDB().Exec("DELETE FROM DATA_SCOPE") - Expect(err).NotTo(HaveOccurred()) - db, err := dataService.DB() Expect(err).NotTo(HaveOccurred()) _, err = db.Exec("DELETE FROM APID")
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 4659f9c..10b553d 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -13,6 +13,9 @@ Context("Sync", func() { + const expectedDataScopeId1 = "dataScope1" + const expectedDataScopeId2 = "dataScope2" + var initializeContext = func() { testRouter = apid.API().Router() testServer = httptest.NewServer(testRouter) @@ -53,53 +56,58 @@ var lastSnapshot *common.Snapshot expectedSnapshotTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms.company"}, - common.Change{Table: "edgex.apid_cluster"}, - common.Change{Table: "edgex.data_scope"}}, + Changes: []common.Change{common.Change{Table: "kms_company"}, + common.Change{Table: "edgex_apid_cluster"}, + common.Change{Table: "edgex_data_scope"}, + common.Change{Table: "kms_app_credential"}, + common.Change{Table: "kms_app_credential_apiproduct_mapper"}, + common.Change{Table: "kms_developer"}, + common.Change{Table: "kms_company_developer"}, + common.Change{Table: "kms_api_product"}, + common.Change{Table: "kms_app"}}, } apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { if s, ok := event.(*common.Snapshot); ok { + Expect(16).To(Equal(len(knownTables))) Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) - //add apid_cluster and data_scope since those would present if this were a real scenario - knownTables["kms.app_credential"] = true - knownTables["kms.app_credential_apiproduct_mapper"] = true - knownTables["kms.developer"] = true - knownTables["kms.company_developer"] = true - knownTables["kms.api_product"] = true - knownTables["kms.app"] = true - lastSnapshot = s - for _, t := range s.Tables { - switch t.Name { + db, _ := dataService.DBVersion(s.SnapshotInfo) + var rowCount int + var id string - case "edgex.apid_cluster": - Expect(t.Rows).To(HaveLen(1)) - r := t.Rows[0] - var id string - r.Get("id", &id) - Expect(id).To(Equal("bootstrap")) + err := db.Ping() + Expect(err).NotTo(HaveOccurred()) + numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;") + if err != nil { + Fail("Failed to get correct DB") + } + Expect(true).To(Equal(numApidClusters.Next())) + numApidClusters.Scan(&rowCount) + Expect(1).To(Equal(rowCount)) + apidClusters, _ := db.Query("select id from edgex_apid_cluster;") + apidClusters.Next() + apidClusters.Scan(&id) + Expect(id).To(Equal(expectedClusterId)) - case "edgex.data_scope": - Expect(t.Rows).To(HaveLen(2)) - r := t.Rows[1] // get the non-cluster row + numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;") + Expect(true).To(Equal(numDataScopes.Next())) + numDataScopes.Scan(&rowCount) + Expect(2).To(Equal(rowCount)) + dataScopes, _ := db.Query("select id from edgex_data_scope;") + dataScopes.Next() + dataScopes.Scan(&id) + dataScopes.Next() - var id, clusterID, env, org, scope string - r.Get("id", &id) - r.Get("apid_cluster_id", &clusterID) - r.Get("env", &env) - r.Get("org", &org) - r.Get("scope", &scope) - - Expect(id).To(Equal("ert452")) - Expect(scope).To(Equal("ert452")) - Expect(clusterID).To(Equal("bootstrap")) - Expect(env).To(Equal("prod")) - Expect(org).To(Equal("att")) - } + if id == expectedDataScopeId1 { + dataScopes.Scan(&id) + Expect(id).To(Equal(expectedDataScopeId2)) + } else { + dataScopes.Scan(&id) + Expect(id).To(Equal(expectedDataScopeId1)) } } else if cl, ok := event.(*common.ChangeList); ok { @@ -122,12 +130,12 @@ Expect(tenantID).To(Equal("ert452")) } - Expect(tables).To(ContainElement("kms.app_credential")) - Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) - Expect(tables).To(ContainElement("kms.developer")) - Expect(tables).To(ContainElement("kms.company_developer")) - Expect(tables).To(ContainElement("kms.api_product")) - Expect(tables).To(ContainElement("kms.app")) + Expect(tables).To(ContainElement("kms_app_credential")) + Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper")) + Expect(tables).To(ContainElement("kms_developer")) + Expect(tables).To(ContainElement("kms_company_developer")) + Expect(tables).To(ContainElement("kms_api_product")) + Expect(tables).To(ContainElement("kms_app")) go func() { // when close done, all handlers for the first changeList have been executed @@ -135,10 +143,8 @@ defer GinkgoRecover() // allow other handler to execute to insert last_sequence var seq string - //for seq = ""; seq == ""; { - // time.Sleep(50 * time.Millisecond) - err := getDB(). - QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). + err = getDB(). + QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;"). Scan(&seq) Expect(err).NotTo(HaveOccurred()) //} @@ -163,9 +169,9 @@ initializeContext() expectedTables := common.ChangeList{ - Changes: []common.Change{common.Change{Table: "kms.company"}, - common.Change{Table: "edgex.apid_cluster"}, - common.Change{Table: "edgex.data_scope"}}, + Changes: []common.Change{common.Change{Table: "kms_company"}, + common.Change{Table: "edgex_apid_cluster"}, + common.Change{Table: "edgex_data_scope"}}, } Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) @@ -333,4 +339,4 @@ }, 3) }) -}) +}) \ No newline at end of file
diff --git a/data.go b/data.go index 4761373..10c27d6 100644 --- a/data.go +++ b/data.go
@@ -8,6 +8,9 @@ "sync" "github.com/30x/apid-core" + "github.com/apigee-labs/transicator/common" + "sort" + "strings" ) var ( @@ -39,30 +42,6 @@ last_snapshot_info text, PRIMARY KEY (instance_id) ); - CREATE TABLE IF NOT EXISTS APID_CLUSTER ( - id text, - name text, - description text, - umbrella_org_app_name text, - created text, - created_by text, - updated text, - updated_by text, - last_sequence text, - PRIMARY KEY (id) - ); - CREATE TABLE IF NOT EXISTS DATA_SCOPE ( - id text, - apid_cluster_id text, - scope text, - org text, - env text, - created text, - created_by text, - updated text, - updated_by text, - PRIMARY KEY (id, apid_cluster_id) - ); `) if err != nil { return err @@ -85,6 +64,293 @@ dbMux.Unlock() } +//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn +func insert(tableName string, rows []common.Row, txn *sql.Tx) bool { + + if len(rows) == 0 { + return false + } + + var orderedColumns []string + for column := range rows[0] { + orderedColumns = append(orderedColumns, column) + } + sort.Strings(orderedColumns) + + sql := buildInsertSql(tableName, orderedColumns, rows) + + prep, err := txn.Prepare(sql) + if err != nil { + log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err) + return false + } + defer prep.Close() + + var values []interface{} + + for _, row := range rows { + for _, columnName := range orderedColumns { + //use Value so that stmt exec does not complain about common.ColumnVal being a struct + //TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping + //TODO right now this will only work when the column type is a string + values = append(values, row[columnName].Value) + } + } + + //create prepared statement from existing template statement + _, err = prep.Exec(values...) + + if err != nil { + log.Errorf("INSERT Fail [%s] value=[%v] error=[%v]", sql, values, err) + return false + } else { + log.Debugf("INSERT Success [%s] value=[%v]", sql, values) + } + + return true +} + +func getValueListFromKeys(row common.Row, pkeys []string) []interface{} { + var values []interface{} + // TODO Handle multiple data types + for _, pkey := range pkeys { + if row[pkey] == nil { + values = append(values, nil) + } else { + values = append(values, row[pkey].Value) + } + } + return values +} + +func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool { + pkeys, err := getPkeysForTable(tableName) + sort.Strings(pkeys) + if len(pkeys) == 0 || err != nil { + log.Errorf("DELETE No primary keys found for table. %s", tableName) + return false + } else if len(rows) == 0 { + log.Errorf("No rows found for table.", tableName) + return false + } else { + + sql := buildDeleteSql(tableName, rows[0], pkeys) + prep, err := txn.Prepare(sql) + if err != nil { + log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err) + return false + } + defer prep.Close() + for _, row := range rows { + values := getValueListFromKeys(row, pkeys) + // delete prepared statement from existing template statement + res, err := txn.Stmt(prep).Exec(values...) + if err != nil { + log.Errorf("DELETE Fail [%s] value=[%v] error=[%v]", sql, values, err) + return false + } else { + affected, err := res.RowsAffected() + if err == nil && affected != 0 { + log.Debugf("DELETE Success [%s] value=[%v]", sql, values) + } else if err == nil && affected == 0 { + log.Errorf("Entry not found [%s] value=[%v]. Nothing to delete.", sql, values) + return false + } else { + log.Errorf("DELETE Failed [%s] value=[%v] error=[%v]", sql, values, err) + return false + } + } + } + return true + } +} + +// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;" +func buildDeleteSql(tableName string, row common.Row, pkeys []string) string { + + var wherePlaceholders []string + i := 1 + if row == nil { + return "" + } + normalizedTableName := normalizeTableName(tableName) + + for _, pk := range pkeys { + wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i)) + i++ + } + + sql := "DELETE FROM " + normalizedTableName + sql += " WHERE " + sql += strings.Join(wherePlaceholders, " AND ") + + return sql + +} + +func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool { + pkeys, err := getPkeysForTable(tableName) + if len(pkeys) == 0 || err != nil { + log.Errorf("UPDATE No primary keys found for table.", tableName) + return false + } else { + if len(oldRows) == 0 || len(newRows) == 0 { + return false + } + + var orderedColumns []string + + //extract sorted orderedColumns + for columnName := range newRows[0] { + orderedColumns = append(orderedColumns, columnName) + } + sort.Strings(orderedColumns) + + //build update statement, use arbitrary row as template + sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys) + prep, err := txn.Prepare(sql) + if err != nil { + log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err) + return false + } + defer prep.Close() + + for i, row := range newRows { + var values []interface{} + + for _, columnName := range orderedColumns { + //use Value so that stmt exec does not complain about common.ColumnVal being a struct + //TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping + //TODO right now this will only work when the column type is a string + if row[columnName] != nil { + values = append(values, row[columnName].Value) + } else { + values = append(values, nil) + } + } + + //add values for where clause, use PKs of old row + for _, pk := range pkeys { + if oldRows[i][pk] != nil { + values = append(values, oldRows[i][pk].Value) + } else { + values = append(values, nil) + } + + } + + //create prepared statement from existing template statement + res, err := txn.Stmt(prep).Exec(values...) + + if err != nil { + log.Errorf("UPDATE Fail [%s] value=[%v] error=[%v]", sql, values, err) + return false + } else { + numRowsAffected, err := res.RowsAffected() + if err != nil { + log.Errorf("UPDATE Fail [%s] value=[%v] error=[%v]", sql, values, err) + return false + } + //delete this once we figure out why tests are failing/not updating + log.Infof("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected) + log.Debugf("UPDATE Success [%s] value=[%v]", sql, values) + } + } + + return true + } +} + +func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string { + if row == nil { + return "" + } + normalizedTableName := normalizeTableName(tableName) + + var setPlaceholders, wherePlaceholders []string + i := 1 + + for _, columnName := range orderedColumns { + setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i)) + i++ + } + + for _, pk := range pkeys { + wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i)) + i++ + } + + sql := "UPDATE " + normalizedTableName + " SET " + sql += strings.Join(setPlaceholders, ", ") + sql += " WHERE " + sql += strings.Join(wherePlaceholders, " AND ") + + return sql +} + +//precondition: rows.length > 1000, max number of entities for sqlite +func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string { + if len(rows) == 0 { + return "" + } + normalizedTableName := normalizeTableName(tableName) + var values string = "" + + var i, j int + k := 1 + for i = 0; i < len(rows)-1; i++ { + values += "(" + for j = 0; j < len(orderedColumns)-1; j++ { + values += fmt.Sprintf("$%d,", k) + k++ + } + values += fmt.Sprintf("$%d),", k) + k++ + } + values += "(" + for j = 0; j < len(orderedColumns)-1; j++ { + values += fmt.Sprintf("$%d,", k) + k++ + } + values += fmt.Sprintf("$%d)", k) + + sql := "INSERT INTO " + normalizedTableName + sql += "(" + strings.Join(orderedColumns, ",") + ") " + sql += "VALUES " + values + + return sql +} + +func getPkeysForTable(tableName string) ([]string, error) { + db := getDB() + normalizedTableName := normalizeTableName(tableName) + sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;" + rows, err := db.Query(sql, normalizedTableName) + if err != nil { + log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err) + return nil, err + } + var columnNames []string + defer rows.Close() + for rows.Next() { + var value string + err := rows.Scan(&value) + if err != nil { + log.Fatal(err) + } + columnNames = append(columnNames, value) + } + err = rows.Err() + if err != nil { + log.Fatal(err) + } + return columnNames, nil +} + +func normalizeTableName(tableName string) string { + return strings.Replace(tableName, ".", "_", 1) +} + func insertApidCluster(dac dataApidCluster, txn *sql.Tx) error { log.Debugf("inserting into APID_CLUSTER: %v", dac) @@ -115,58 +381,6 @@ return err } -func insertDataScope(ds dataDataScope, txn *sql.Tx) error { - - log.Debugf("insert DATA_SCOPE: %v", ds) - - //replace to accomodate same snapshot txid - stmt, err := txn.Prepare(` - REPLACE INTO DATA_SCOPE - (id, apid_cluster_id, scope, org, - env, created, created_by, updated, - updated_by) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9); - `) - if err != nil { - log.Errorf("insert DATA_SCOPE failed: %v", err) - return err - } - defer stmt.Close() - - _, err = stmt.Exec( - ds.ID, ds.ClusterID, ds.Scope, ds.Org, - ds.Env, ds.Created, ds.CreatedBy, ds.Updated, - ds.UpdatedBy) - - if err != nil { - log.Errorf("insert DATA_SCOPE failed: %v", err) - return err - } - - return nil -} - -func deleteDataScope(ds dataDataScope, txn *sql.Tx) error { - - log.Debugf("delete DATA_SCOPE: %v", ds) - - stmt, err := txn.Prepare("DELETE FROM DATA_SCOPE WHERE id=$1 and apid_cluster_id=$2") - if err != nil { - log.Errorf("update DATA_SCOPE failed: %v", err) - return err - } - defer stmt.Close() - - _, err = stmt.Exec(ds.ID, ds.ClusterID) - - if err != nil { - log.Errorf("delete DATA_SCOPE failed: %v", err) - return err - } - - return nil -} - /* * For the given apidConfigId, this function will retrieve all the distinch scopes * associated with it. Distinct, because scope is already a collection of the tenants. @@ -178,9 +392,9 @@ var scope string db := getDB() - rows, err := db.Query("select DISTINCT scope from DATA_SCOPE where apid_cluster_id = $1", configId) + rows, err := db.Query("select DISTINCT scope from EDGEX_DATA_SCOPE where apid_cluster_id = $1", configId) if err != nil { - log.Errorf("Failed to query DATA_SCOPE: %v", err) + log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err) return } defer rows.Close() @@ -198,9 +412,9 @@ */ func getLastSequence() (lastSequence string) { - err := getDB().QueryRow("select last_sequence from APID_CLUSTER LIMIT 1").Scan(&lastSequence) + err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) if err != nil && err != sql.ErrNoRows { - log.Panicf("Failed to query APID_CLUSTER: %v", err) + log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err) return } @@ -216,20 +430,20 @@ log.Debugf("updateLastSequence: %s", lastSequence) - stmt, err := getDB().Prepare("UPDATE APID_CLUSTER SET last_sequence=$1;") + stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;") if err != nil { - log.Errorf("UPDATE APID_CLUSTER Failed: %v", err) + log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) return err } defer stmt.Close() _, err = stmt.Exec(lastSequence) if err != nil { - log.Errorf("UPDATE APID_CLUSTER Failed: %v", err) + log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) return err } - log.Infof("UPDATE APID_CLUSTER Success: %s", lastSequence) + log.Infof("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) return nil }
diff --git a/data_test.go b/data_test.go new file mode 100644 index 0000000..a26597d --- /dev/null +++ b/data_test.go
@@ -0,0 +1,1185 @@ +package apidApigeeSync + +import ( + "github.com/apigee-labs/transicator/common" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sort" +) + +var _ = Describe("data access tests", func() { + + BeforeEach(func() { + db := getDB() + + //all tests in this file operate on the api_product table. Create the necessary tables for this here + getDB().Exec("CREATE TABLE _transicator_tables " + + "(tableName varchar not null, columnName varchar not null, " + + "typid integer, primaryKey bool);") + getDB().Exec("DELETE from _transicator_tables") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)") + getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)") + + getDB().Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " + + "api_resources text,approval_type text,scopes text,proxies text, environments text," + + "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " + + "primary key (id,tenant_id,created_at,updated_at));") + getDB().Exec("DELETE from kms_api_product") + + setDB(db) + initDB(db) + + }) + + Context("Update processing", func() { + It("unit test buildUpdateSql with single primary key", func() { + testRow := common.Row{ + "id": { + Value: "ch_api_product_2", + }, + "api_resources": { + Value: "{}", + }, + "environments": { + Value: "{Env_0, Env_1}", + }, + "tenant_id": { + Value: "tenant_id_0", + }, + "_change_selector": { + Value: "test_org0", + }, + } + + var orderedColumns []string + for column := range testRow { + orderedColumns = append(orderedColumns, column) + } + sort.Strings(orderedColumns) + + result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"}) + Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" + + " WHERE id=$6").To(Equal(result)) + }) + + It("unit test buildUpdateSql with composite primary key", func() { + testRow := common.Row{ + "id1": { + Value: "composite-key-1", + }, + "id2": { + Value: "composite-key-2", + }, + "api_resources": { + Value: "{}", + }, + "environments": { + Value: "{Env_0, Env_1}", + }, + "tenant_id": { + Value: "tenant_id_0", + }, + "_change_selector": { + Value: "test_org0", + }, + } + + var orderedColumns []string + for column := range testRow { + orderedColumns = append(orderedColumns, column) + } + sort.Strings(orderedColumns) + + result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"}) + Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" + + " WHERE id1=$7 AND id2=$8").To(Equal(result)) + }) + + It("test update with composite primary key", func() { + event := &common.ChangeList{} + + //this needs to match what is actually in the DB + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(true).To(Equal(processChangeList(event))) + err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + }) + + It("update should succeed if newrow modifies the primary key", func() { + event := &common.ChangeList{} + + //this needs to match what is actually in the DB + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + newRow := common.Row{ + "id": { + Value: "new_id", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(true).To(Equal(processChangeList(event))) + err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) + + It("update should succeed if newrow contains fewer fields than oldrow", func() { + event := &common.ChangeList{} + + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(true).To(Equal(processChangeList(event))) + err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) + + It("update should succeed if oldrow contains fewer fields than newrow", func() { + event := &common.ChangeList{} + + oldRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "A product for testing Greg", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + newRow := common.Row{ + "id": { + Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: oldRow, + Operation: 1, + }, + } + //insert and assert success + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //create update event + event.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: oldRow, + NewRow: newRow, + Operation: 2, + }, + } + + //do the update + Expect(true).To(Equal(processChangeList(event))) + err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + }) + }) + + Context("Insert processing", func() { + It("Properly constructs insert sql for one row", func() { + newRow := common.Row{ + "id": { + Value: "new_id", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + var orderedColumns []string + for column := range newRow { + orderedColumns = append(orderedColumns, column) + } + sort.Strings(orderedColumns) + + expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)" + Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow}))) + }) + + It("Properly constructs insert sql for multiple rows", func() { + newRow1 := common.Row{ + "id": { + Value: "1", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + newRow2 := common.Row{ + "id": { + Value: "2", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + var orderedColumns []string + for column := range newRow1 { + orderedColumns = append(orderedColumns, column) + } + sort.Strings(orderedColumns) + + expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)" + Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2}))) + }) + + It("Properly executes insert for a single rows", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + } + + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + }) + + It("Properly executed insert for multiple rows", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + newRow2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: newRow2, + Operation: 1, + }, + } + + Expect(true).To(Equal(processChangeList(event))) + var nRows int + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(2)) + }) + + It("Fails to execute if row does not match existing table schema", func() { + event := &common.ChangeList{} + + newRow1 := common.Row{ + "not_and_id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + } + + ok := processChangeList(event) + Expect(false).To(Equal(ok)) + + var nRows int + //assert that no extraneous rows were added + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(0)) + }) + + It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() { + event := &common.ChangeList{} + newRow1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + newRow2 := common.Row{ + "not_and_id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: newRow1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: newRow2, + Operation: 1, + }, + } + + ok := processChangeList(event) + Expect(false).To(Equal(ok)) + }) + }) + + Context("Delete processing", func() { + It("Properly constructs sql prepare for Delete", func() { + row := common.Row{ + "id": { + Value: "new_id", + }, + "api_resources": { + Value: "{/**}", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "43aef41d", + }, + "description": { + Value: "new description", + }, + "created_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "updated_at": { + Value: "2017-03-01 22:50:41.75+00:00", + }, + "_change_selector": { + Value: "43aef41d", + }, + } + + pkeys, err := getPkeysForTable("kms_api_product") + Expect(err).Should(Succeed()) + sql := buildDeleteSql("kms_api_product", row, pkeys) + Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4")) + }) + + It("Verify execute single insert & single delete works", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + } + + Expect(true).To(Equal(processChangeList(event1))) + var nRows int + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + Expect(true).To(Equal(processChangeList(event2))) + + // validate delete + err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(0)) + + // delete again should fail - coz entry will not exist + Expect(false).To(Equal(processChangeList(event2))) + }) + + It("verify multiple insert and single delete works", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + Row2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + { + Table: "kms.api_product", + NewRow: Row2, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + } + + Expect(true).To(Equal(processChangeList(event1))) + var nRows int + //verify first row + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //verify second row + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(2)) + + Expect(true).To(Equal(processChangeList(event2))) + + //verify second row still exists + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + // validate delete + err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + // delete again should fail - coz entry will not exist + Expect(false).To(Equal(processChangeList(event2))) + }, 3) + + It("verify single insert and multiple delete fails", func() { + event1 := &common.ChangeList{} + event2 := &common.ChangeList{} + + Row1 := common.Row{ + "id": { + Value: "a", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + Row2 := common.Row{ + "id": { + Value: "b", + }, + "api_resources": { + Value: "r", + }, + "environments": { + Value: "{test}", + }, + "tenant_id": { + Value: "t", + }, + "description": { + Value: "d", + }, + "created_at": { + Value: "c", + }, + "updated_at": { + Value: "u", + }, + "_change_selector": { + Value: "cs", + }, + } + + event1.Changes = []common.Change{ + { + Table: "kms.api_product", + NewRow: Row1, + Operation: 1, + }, + } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: Row1, + Operation: 3, + }, + { + Table: "kms.api_product", + OldRow: Row2, + Operation: 3, + }, + } + + Expect(true).To(Equal(processChangeList(event1))) + var nRows int + //verify insert + err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" + + "and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" + + "and _change_selector='cs'").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + //assert that no extraneous rows were added + err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows) + Expect(err).NotTo(HaveOccurred()) + Expect(nRows).To(Equal(1)) + + Expect(false).To(Equal(processChangeList(event2))) + + }, 3) + }) +})
diff --git a/init.go b/init.go index cf90781..7ac68a2 100644 --- a/init.go +++ b/init.go
@@ -126,8 +126,8 @@ } } proto := config.GetString(configSnapshotProtocol) - if proto != "json" && proto != "proto" { - return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) + if proto != "json" && proto != "proto" && proto != "sqlite" { + return fmt.Errorf("Illegal value for %s. Must be: 'json', 'proto', or 'sqlite'", configSnapshotProtocol) } return nil
diff --git a/listener.go b/listener.go index 49e3d6b..fd0a5a8 100644 --- a/listener.go +++ b/listener.go
@@ -36,11 +36,7 @@ log.Panicf("Unable to access database: %v", err) } - if config.GetString(configSnapshotProtocol) == "json" { - processJsonSnapshot(snapshot, db) - } else if config.GetString(configSnapshotProtocol) == "sqlite" { - processSqliteSnapshot(snapshot, db) - } + processSqliteSnapshot(db) //update apid instance info apidInfo.LastSnapshot = snapshot.SnapshotInfo @@ -54,113 +50,73 @@ } -func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) { - //nothing to do as of now, here as a placeholder -} -func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) { +func processSqliteSnapshot(db apid.DB) { - err := initDB(db) + var numApidClusters int + apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster") if err != nil { - log.Panicf("Unable to initialize database: %v", err) + log.Panicf("Unable to read database: %s", err.Error()) + } + apidClusters.Next() + apidClusters.Scan(&numApidClusters) + + if numApidClusters != 1 { + log.Panic("Illegal state for apid_cluster. Must be a single row.") } - tx, err := db.Begin() + _, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") if err != nil { - log.Panicf("Error starting transaction: %v", err) - } - defer tx.Rollback() - - for _, table := range snapshot.Tables { - - switch table.Name { - case LISTENER_TABLE_APID_CLUSTER: - if len(table.Rows) > 1 { - log.Panic("Illegal state for apid_cluster. Must be a single row.") - } - for _, row := range table.Rows { - ac := makeApidClusterFromRow(row) - err := insertApidCluster(ac, tx) - if err != nil { - log.Panicf("Snapshot update failed: %v", err) - } - } - - case LISTENER_TABLE_DATA_SCOPE: - for _, row := range table.Rows { - ds := makeDataScopeFromRow(row) - err := insertDataScope(ds, tx) - if err != nil { - log.Panicf("Snapshot update failed: %v", err) - } - } + if err.Error() == "duplicate column name: last_sequence" { + return + } else { + log.Error("[[" + err.Error() + "]]") + log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err) } } - - err = tx.Commit() - if err != nil { - log.Panicf("Error committing Snapshot change: %v", err) - } } -func processChangeList(changes *common.ChangeList) { +func processChangeList(changes *common.ChangeList) bool { + + ok := false tx, err := getDB().Begin() if err != nil { log.Panicf("Error processing ChangeList: %v", err) + return ok } defer tx.Rollback() log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) for _, change := range changes.Changes { - switch change.Table { - case "edgex.apid_cluster": - switch change.Operation { - case common.Delete: - // todo: shut down apid, delete databases, scorch the earth! - log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) - default: - log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) - } - case "edgex.data_scope": - switch change.Operation { - case common.Insert: - ds := makeDataScopeFromRow(change.NewRow) - err = insertDataScope(ds, tx) - case common.Delete: - ds := makeDataScopeFromRow(change.OldRow) - err = deleteDataScope(ds, tx) - default: - // common.Update is not allowed - log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) - } + if change.Table == LISTENER_TABLE_APID_CLUSTER { + log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) } - if err != nil { - log.Panicf("Error processing ChangeList: %v", err) + switch change.Operation { + case common.Insert: + ok = insert(change.Table, []common.Row{change.NewRow}, tx) + case common.Update: + if change.Table == LISTENER_TABLE_DATA_SCOPE { + log.Panicf("illegal operation: %s for %s", change.Operation, change.Table) + } + ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx) + case common.Delete: + ok = _delete(change.Table, []common.Row{change.OldRow}, tx) + } + if !ok { + log.Error("Sql Operation error. Operation rollbacked") + return ok } } err = tx.Commit() if err != nil { log.Panicf("Error processing ChangeList: %v", err) + return false } -} -func makeApidClusterFromRow(row common.Row) dataApidCluster { - - dac := dataApidCluster{} - - row.Get("id", &dac.ID) - row.Get("name", &dac.Name) - row.Get("umbrella_org_app_name", &dac.OrgAppName) - row.Get("created", &dac.Created) - row.Get("created_by", &dac.CreatedBy) - row.Get("updated", &dac.Updated) - row.Get("updated_by", &dac.UpdatedBy) - row.Get("description", &dac.Description) - - return dac + return ok } func makeDataScopeFromRow(row common.Row) dataDataScope {
diff --git a/listener_test.go b/listener_test.go index b5fea9b..7d4af9b 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -5,48 +5,32 @@ . "github.com/onsi/gomega" "github.com/apigee-labs/transicator/common" + "os" ) var _ = Describe("listener", func() { handler := handler{} - var saveLastSnapshot string + + var createTestDb = func (sqlfile string, dbId string) common.Snapshot{ + initDb(sqlfile, "./mockdb.sqlite3") + file, err := os.Open("./mockdb.sqlite3") + if err != nil { + Fail("Failed to open mock db for test") + } + + s := common.Snapshot{} + err = processSnapshotServerFileResponse(dbId, file, &s) + if err != nil { + Fail("Error processing test snapshots") + } + return s + } Context("ApigeeSync snapshot event", func() { - It("should set DB to appropriate version", func() { - log.Info("Starting listener tests...") - - //save the last snapshot, so we can restore it at the end of this context - saveLastSnapshot = apidInfo.LastSnapshot - - event := common.Snapshot{ - SnapshotInfo: "test_snapshot", - Tables: []common.Table{}, - } - - handler.Handle(&event) - - Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo)) - - expectedDB, err := dataService.DBVersion(event.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - - Expect(getDB() == expectedDB).Should(BeTrue()) - }) - It("should fail if more than one apid_cluster rows", func() { - - event := common.Snapshot{ - SnapshotInfo: "test_snapshot_fail", - Tables: []common.Table{ - { - Name: LISTENER_TABLE_APID_CLUSTER, - Rows: []common.Row{{}, {}}, - }, - }, - } - + event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters") Expect(func() { handler.Handle(&event) }).To(Panic()) }, 3) @@ -68,74 +52,7 @@ It("should process a valid Snapshot", func() { - event := common.Snapshot{ - SnapshotInfo: "test_snapshot_valid", - Tables: []common.Table{ - { - Name: LISTENER_TABLE_APID_CLUSTER, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "i"}, - "name": &common.ColumnVal{Value: "n"}, - "umbrella_org_app_name": &common.ColumnVal{Value: "o"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - "description": &common.ColumnVal{Value: "d"}, - }, - }, - }, - { - Name: LISTENER_TABLE_DATA_SCOPE, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s1"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e1"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - }, - }, - }, - { - Name: LISTENER_TABLE_DATA_SCOPE, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "j"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s1"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e2"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - }, - }, - }, - { - Name: LISTENER_TABLE_DATA_SCOPE, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "k"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s2"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e3"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - }, - }, - }, - }, - } + event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid") handler.Handle(&event) @@ -146,13 +63,18 @@ db := getDB() + expectedDB, err := dataService.DBVersion(event.SnapshotInfo) + Expect(err).NotTo(HaveOccurred()) + + Expect(db == expectedDB).Should(BeTrue()) + // apid Cluster var dcs []dataApidCluster rows, err := db.Query(` SELECT id, name, description, umbrella_org_app_name, created, created_by, updated, updated_by - FROM APID_CLUSTER`) + FROM EDGEX_APID_CLUSTER`) Expect(err).NotTo(HaveOccurred()) defer rows.Close() @@ -182,7 +104,7 @@ SELECT id, apid_cluster_id, scope, org, env, created, created_by, updated, updated_by - FROM DATA_SCOPE`) + FROM EDGEX_DATA_SCOPE`) Expect(err).NotTo(HaveOccurred()) defer rows.Close() @@ -219,7 +141,6 @@ Expect(scopes[1]).To(Equal("s2")) //restore the last snapshot - apidInfo.LastSnapshot = saveLastSnapshot }, 3) }) @@ -228,10 +149,12 @@ Context(LISTENER_TABLE_APID_CLUSTER, func() { It("insert event should panic", func() { - //save the last snapshot, so we can restore it at the end of this context - saveLastSnapshot = apidInfo.LastSnapshot + ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic") + handler.Handle(&ssEvent) - event := common.ChangeList{ + //save the last snapshot, so we can restore it at the end of this context + + csEvent := common.ChangeList{ LastSequence: "test", Changes: []common.Change{ { @@ -241,10 +164,12 @@ }, } - Expect(func() { handler.Handle(&event) }).To(Panic()) + Expect(func() { handler.Handle(&csEvent) }).To(Panic()) }, 3) It("update event should panic", func() { + ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic") + handler.Handle(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -258,17 +183,15 @@ Expect(func() { handler.Handle(&event) }).To(Panic()) //restore the last snapshot - apidInfo.LastSnapshot = saveLastSnapshot }, 3) - PIt("delete event should kill all the things!") }) Context(LISTENER_TABLE_DATA_SCOPE, func() { It("insert event should add", func() { - //save the last snapshot, so we can restore it at the end of this context - saveLastSnapshot = apidInfo.LastSnapshot + ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert") + handler.Handle(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -286,6 +209,7 @@ "created_by": &common.ColumnVal{Value: "c"}, "updated": &common.ColumnVal{Value: "u"}, "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, }, }, { @@ -301,6 +225,7 @@ "created_by": &common.ColumnVal{Value: "c"}, "updated": &common.ColumnVal{Value: "u"}, "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, }, }, }, @@ -314,7 +239,7 @@ SELECT id, apid_cluster_id, scope, org, env, created, created_by, updated, updated_by - FROM DATA_SCOPE`) + FROM EDGEX_DATA_SCOPE`) Expect(err).NotTo(HaveOccurred()) defer rows.Close() @@ -326,6 +251,7 @@ dds = append(dds, d) } + //three already existing Expect(len(dds)).To(Equal(2)) ds := dds[0] @@ -349,6 +275,8 @@ }, 3) It("delete event should delete", func() { + ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete") + handler.Handle(&ssEvent) insert := common.ChangeList{ LastSequence: "test", Changes: []common.Change{ @@ -365,6 +293,7 @@ "created_by": &common.ColumnVal{Value: "c"}, "updated": &common.ColumnVal{Value: "u"}, "updated_by": &common.ColumnVal{Value: "u"}, + "_change_selector": &common.ColumnVal{Value: "cs"}, }, }, }, @@ -386,13 +315,15 @@ handler.Handle(&delete) var nRows int - err := getDB().QueryRow("SELECT count(id) FROM DATA_SCOPE").Scan(&nRows) + err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows) Expect(err).NotTo(HaveOccurred()) - Expect(nRows).To(Equal(0)) + Expect(0).To(Equal(nRows)) }, 3) - It("update event should panic", func() { + It("update event should panic for data scopes table", func() { + ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic") + handler.Handle(&ssEvent) event := common.ChangeList{ LastSequence: "test", @@ -406,10 +337,9 @@ Expect(func() { handler.Handle(&event) }).To(Panic()) //restore the last snapshot - apidInfo.LastSnapshot = saveLastSnapshot }, 3) + //TODO add tests for update/insert/delete cluster }) - }) })
diff --git a/mock_server.go b/mock_server.go index f7b3600..01339ec 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -8,10 +8,8 @@ "math/rand" "net/http" "net/url" - "strconv" "sync" "sync/atomic" - "time" "net" @@ -19,6 +17,11 @@ "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "strconv" + "os" + "io" + "database/sql" + "io/ioutil" ) /* @@ -49,10 +52,7 @@ Organization string Environment string NumDevelopers int - AddDeveloperEvery time.Duration - UpdateDeveloperEvery time.Duration NumDeployments int - ReplaceDeploymentEvery time.Duration BundleURI string } @@ -72,7 +72,6 @@ params MockParms oauthToken string snapshotID string - snapshotTables map[string][]common.Table // key = scopeID changeChannel chan []byte sequenceID *int64 maxDevID *int64 @@ -111,6 +110,28 @@ return strconv.FormatInt(newMinID-1, 10) } +func initDb(statements, path string) { + + f, _ := os.Create(path) + f.Close() + + db, err := sql.Open("sqlite3", path) + if err != nil { + log.Panic("Could not instantiate mock db, %s", err) + } + defer db.Close() + sqlStatementsBuffer, err := ioutil.ReadFile(statements) + if err != nil { + log.Panic("Could not instantiate mock db, %s", err) + } + sqlStatementsString := string(sqlStatementsBuffer) + _, err = db.Exec(sqlStatementsString) + if err != nil { + log.Panic("Could not instantiate mock db, %s", err) + } + +} + func (m *MockServer) init() { defer GinkgoRecover() RegisterFailHandler(func(message string, callerSkip ...int) { @@ -126,78 +147,9 @@ m.maxDeploymentID = new(int64) m.newSnap = new(int32) - go m.developerGenerator() - go m.developerUpdater() - go m.deploymentReplacer() + initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3") + initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3") - // cluster "scope" - cluster := m.newRow(map[string]string{ - "id": m.params.ClusterID, - "_change_selector": m.params.ClusterID, - }) - - // data scopes - var dataScopes []common.Row - dataScopes = append(dataScopes, cluster) - dataScopes = append(dataScopes, m.newRow(map[string]string{ - "id": m.params.Scope, - "scope": m.params.Scope, - "org": m.params.Organization, - "env": m.params.Environment, - "apid_cluster_id": m.params.ClusterID, - "_change_selector": m.params.Scope, - })) - - // cluster & data_scope snapshot tables - m.snapshotTables = map[string][]common.Table{} - m.snapshotTables[m.params.ClusterID] = []common.Table{ - { - Name: "edgex.apid_cluster", - Rows: []common.Row{cluster}, - }, - { - Name: "edgex.data_scope", - Rows: dataScopes, - }, - } - - var snapshotTableRows []tableRowMap - - // generate one company - companyID := m.params.Organization - tenantID := m.params.Scope - changeSelector := m.params.Scope - company := tableRowMap{ - "kms.company": m.newRow(map[string]string{ - "id": companyID, - "status": "Active", - "tenant_id": tenantID, - "name": companyID, - "display_name": companyID, - "_change_selector": changeSelector, - }), - } - snapshotTableRows = append(snapshotTableRows, company) - - // generate snapshot developers - for i := 0; i < m.params.NumDevelopers; i++ { - developer := m.createDeveloperWithProductAndApp() - snapshotTableRows = append(snapshotTableRows, developer) - } - log.Infof("created %d developers", m.params.NumDevelopers) - - // generate snapshot deployments - for i := 0; i < m.params.NumDeployments; i++ { - deployment := m.createDeployment() - snapshotTableRows = append(snapshotTableRows, deployment) - } - log.Infof("created %d deployments", m.params.NumDeployments) - - m.snapshotTables[m.params.Scope] = m.concatTableRowMaps(snapshotTableRows...) - - if m.params.NumDevelopers < 10 && m.params.NumDeployments < 10 { - log.Debugf("snapshotTables: %v", m.snapshotTables[m.params.Scope]) - } } // developer, product, application, credential will have the same ID (developerID) @@ -281,28 +233,19 @@ Expect(scopes).To(ContainElement(m.params.ClusterID)) - m.snapshotID = generateUUID() - snapshot := &common.Snapshot{ - SnapshotInfo: m.snapshotID, + w.Header().Set("Transicator-Snapshot-TXID", generateUUID()) + + if len(scopes) == 1 { + //send bootstrap db + err := streamFile("./mockdb_boot.sqlite3", w) + Expect(err).NotTo(HaveOccurred()) + return + } else { + //send data db + err := streamFile("./mockdb.sqlite3", w) + Expect(err).NotTo(HaveOccurred()) + return } - - // Note: if/when we support multiple scopes, we'd have to do a merge of table rows - for _, scope := range scopes { - tables := m.snapshotTables[scope] - for _, table := range tables { - snapshot.AddTables(table) - } - } - - body, err := json.Marshal(snapshot) - Expect(err).NotTo(HaveOccurred()) - - log.Infof("sending snapshot: %s", m.snapshotID) - if len(body) < 10000 { - log.Debugf("snapshot: %#v", string(body)) - } - - w.Write(body) } func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) { @@ -323,9 +266,9 @@ q := req.URL.Query() scopes := q["scope"] - block, err := strconv.Atoi(q.Get("block")) + _, err := strconv.Atoi(q.Get("block")) Expect(err).NotTo(HaveOccurred()) - since := q.Get("since") + _ = q.Get("since") Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID)) //Expect(q.Get("snapshot")).To(Equal(m.snapshotID)) @@ -333,11 +276,6 @@ Expect(scopes).To(ContainElement(m.params.ClusterID)) //Expect(scopes).To(ContainElement(m.params.Scope)) - if since != "" { - m.sendChange(w, time.Duration(block)*time.Second) - return - } - // todo: the following is just legacy for the existing test in apigeeSync_suite_test developer := m.createDeveloperWithProductAndApp() changeList := m.createInsertChange(developer) @@ -348,93 +286,6 @@ w.Write(body) } -// generate developers w/ product and app -func (m *MockServer) developerGenerator() { - - for range time.Tick(m.params.AddDeveloperEvery) { - - developer := m.createDeveloperWithProductAndApp() - changeList := m.createInsertChange(developer) - - body, err := json.Marshal(changeList) - if err != nil { - log.Errorf("Error adding developer: %v", err) - } - - log.Info("adding developer") - log.Debugf("body: %#v", string(body)) - m.changeChannel <- body - } -} - -// update random developers - set username -func (m *MockServer) developerUpdater() { - - for range time.Tick(m.params.UpdateDeveloperEvery) { - - developerID := m.randomDeveloperID() - - oldDev := m.createDeveloper(developerID) - delete(oldDev, "kms.company_developer") - newDev := m.createDeveloper(developerID) - delete(newDev, "kms.company_developer") - - newRow := newDev["kms.developer"] - newRow["username"] = m.stringColumnVal("i_am_not_a_number") - - changeList := m.createUpdateChange(oldDev, newDev) - - body, err := json.Marshal(changeList) - if err != nil { - log.Errorf("Error updating developer: %v", err) - } - - log.Info("updating developer") - log.Debugf("body: %#v", string(body)) - m.changeChannel <- body - } -} - -func (m *MockServer) deploymentReplacer() { - - for range time.Tick(m.params.ReplaceDeploymentEvery) { - - // delete - oldDep := tableRowMap{} - oldDep["edgex.deployment"] = m.newRow(map[string]string{ - "id": m.popDeploymentID(), - }) - deleteChange := m.createDeleteChange(oldDep) - - // insert - newDep := m.createDeployment() - insertChange := m.createInsertChange(newDep) - - changeList := m.concatChangeLists(deleteChange, insertChange) - - body, err := json.Marshal(changeList) - if err != nil { - log.Errorf("Error replacing deployment: %v", err) - } - - log.Info("replacing deployment") - log.Debugf("body: %#v", string(body)) - m.changeChannel <- body - } -} - -// todo: we could debounce this if necessary -func (m *MockServer) sendChange(w http.ResponseWriter, timeout time.Duration) { - select { - case change := <-m.changeChannel: - log.Info("sending change to client") - w.Write(change) - case <-time.After(timeout): - log.Info("change request timeout") - w.WriteHeader(http.StatusNotModified) - } -} - // enables GoMega handling func (m *MockServer) gomega(target http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, req *http.Request) { @@ -537,7 +388,7 @@ Expect(err).ShouldNot(HaveOccurred()) rows := tableRowMap{} - rows["edgex.deployment"] = m.newRow(map[string]string{ + rows["kms_deployment"] = m.newRow(map[string]string{ "id": deploymentID, "bundle_config_id": bundleID, "apid_cluster_id": m.params.ClusterID, @@ -556,14 +407,14 @@ rows := tableRowMap{} - rows["kms.developer"] = m.newRow(map[string]string{ + rows["kms_developer"] = m.newRow(map[string]string{ "id": developerID, "status": "Active", "tenant_id": tenantID, }) // map developer onto to existing company - rows["kms.company_developer"] = m.newRow(map[string]string{ + rows["kms_company_developer"] = m.newRow(map[string]string{ "id": developerID, "tenant_id": tenantID, "company_id": companyID, @@ -581,7 +432,7 @@ resources := fmt.Sprintf("{%s}", "/") // todo: what should be here? rows := tableRowMap{} - rows["kms.api_product"] = m.newRow(map[string]string{ + rows["kms_api_product"] = m.newRow(map[string]string{ "id": productID, "api_resources": resources, "environments": environments, @@ -596,21 +447,21 @@ rows := tableRowMap{} - rows["kms.app"] = m.newRow(map[string]string{ + rows["kms_app"] = m.newRow(map[string]string{ "id": applicationID, "developer_id": developerID, "status": "Approved", "tenant_id": tenantID, }) - rows["kms.app_credential"] = m.newRow(map[string]string{ + rows["kms_app_credential"] = m.newRow(map[string]string{ "id": credentialID, "app_id": applicationID, "tenant_id": tenantID, "status": "Approved", }) - rows["kms.app_credential_apiproduct_mapper"] = m.newRow(map[string]string{ + rows["kms_app_credential_apiproduct_mapper"] = m.newRow(map[string]string{ "apiprdt_id": productID, "app_id": applicationID, "appcred_id": credentialID, @@ -688,26 +539,6 @@ } // create []common.Table from array of tableRowMaps -func (m *MockServer) concatTableRowMaps(maps ...tableRowMap) []common.Table { - tableMap := map[string]*common.Table{} - for _, m := range maps { - for name, row := range m { - if _, ok := tableMap[name]; !ok { - tableMap[name] = &common.Table{ - Name: name, - } - } - tableMap[name].AddRowstoTable(row) - } - } - result := []common.Table{} - for _, v := range tableMap { - result = append(result, *v) - } - return result -} - -// create []common.Table from array of tableRowMaps func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList { result := common.ChangeList{} if len(changeLists) > 0 { @@ -721,3 +552,16 @@ } return result } + +func streamFile(srcFile string, w http.ResponseWriter) error { + inFile, err := os.Open(srcFile) + if err != nil { + return err + } + defer inFile.Close() + + w.Header().Set("Content-Type", "application/transicator+sqlite") + + _, err = io.Copy(w, inFile) + return err +}
diff --git a/snapshot.go b/snapshot.go index 3b288a2..b9afe38 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -1,7 +1,6 @@ package apidApigeeSync import ( - "encoding/json" "github.com/30x/apid-core" "github.com/30x/apid-core/data" "github.com/apigee-labs/transicator/common" @@ -136,18 +135,11 @@ func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { knownTables = extractTablesFromSnapshot(snapshot) - db, err := dataService.DBVersion(snapshot.SnapshotInfo) + _, err := dataService.DBVersion(snapshot.SnapshotInfo) if err != nil { log.Panicf("Database inaccessible: %v", err) } - // if closed - if atomic.LoadInt32(s.isClosed) == int32(1) { - log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager") - return - } - persistKnownTablesToDB(knownTables, db) - log.Info("Emitting Snapshot to plugins") select { @@ -167,14 +159,31 @@ log.Debug("Extracting table names from snapshot") if snapshot.Tables == nil { //if this panic ever fires, it's a bug - log.Panicf("Attempt to extract known tables from snapshot without tables failed") - } + db, err := dataService.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") + if err != nil { + log.Panicf("Unable to read in known snapshot tables from sqlite file") + } + for rows.Next() { + var tableName string + rows.Scan(&tableName) + if err != nil { + log.Panic("Error scaning tableNames from _transicator_tables") + } + tables[tableName] = true + } - for _, table := range snapshot.Tables { - tables[table.Name] = true - } + } else { + for _, table := range snapshot.Tables { + tables[table.Name] = true + } + } return tables + } func extractTablesFromDB(db apid.DB) (tables map[string]bool) { @@ -182,7 +191,7 @@ tables = make(map[string]bool) log.Debug("Extracting table names from existing DB") - rows, err := db.Query("SELECT name FROM _known_tables;") + rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;") defer rows.Close() if err != nil { @@ -263,15 +272,13 @@ } addHeaders(req) - var processSnapshotResponse func(*http.Response, *common.Snapshot) error + var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error - // Set the transport protocol type based on conf file input - if config.GetString(configSnapshotProtocol) == "json" { - req.Header.Set("Accept", "application/json") - processSnapshotResponse = processSnapshotServerJsonResponse - } else if config.GetString(configSnapshotProtocol) == "sqlite" { + if config.GetString(configSnapshotProtocol) == "sqlite" { req.Header.Set("Accept", "application/transicator+sqlite") processSnapshotResponse = processSnapshotServerFileResponse + } else { + log.Panic("Only currently supported snashot protocol is sqlite") } // Issue the request to the snapshot server @@ -290,7 +297,7 @@ } // Decode the Snapshot server response - err = processSnapshotResponse(r, snapshot) + err = processSnapshotResponse(r.Header.Get("Transicator-Snapshot-TXID"), r.Body, snapshot) if err != nil { log.Errorf("Snapshot server response Data not parsable: %v", err) return err @@ -300,50 +307,24 @@ } } -func persistKnownTablesToDB(tables map[string]bool, db apid.DB) { - log.Debugf("Inserting table names found in snapshot into db") +func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error { + dbPath := data.DBPath("common/" + dbId) + log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath) - tx, err := db.Begin() + //this path includes the sqlite3 file name. why does mkdir all stop at parent?? + log.Infof("Creating directory with mkdirall %s", dbPath) + err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700) if err != nil { - log.Panicf("Error starting transaction: %v", err) + log.Errorf("Error creating db path %s", err) } - defer tx.Rollback() - - _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));") - if err != nil { - log.Panicf("Could not create _known_tables table: %s", err) - } - - for name := range tables { - log.Debugf("Inserting %s into _known_tables", name) - _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name) - if err != nil { - log.Panicf("Error encountered inserting into known tables ", err) - } - - } - - err = tx.Commit() - if err != nil { - log.Panicf("Error committing transaction: %v", err) - - } -} - -func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error { - return json.NewDecoder(r.Body).Decode(snapshot) -} - -func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error { - dbId := r.Header.Get("Transicator-Snapshot-TXID") - out, err := os.Create(data.DBPath(dbId)) + out, err := os.Create(dbPath) if err != nil { return err } defer out.Close() //stream respose to DB - _, err = io.Copy(out, r.Body) + _, err = io.Copy(out, body) if err != nil { return err
diff --git a/sql/init_listener_test_duplicate_apids.sql b/sql/init_listener_test_duplicate_apids.sql new file mode 100644 index 0000000..21bd615 --- /dev/null +++ b/sql/init_listener_test_duplicate_apids.sql
@@ -0,0 +1,37 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE _transicator_metadata +(key varchar primary key, +value varchar); +INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests'); +CREATE TABLE _transicator_tables +(tableName varchar not null, +columnName varchar not null, +typid integer, +primaryKey bool); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); +CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector)); +INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap'); +INSERT INTO "edgex_apid_cluster" VALUES('bootstrap2','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap'); +CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector)); +INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap'); +INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap'); +COMMIT;
diff --git a/sql/init_listener_test_no_datascopes.sql b/sql/init_listener_test_no_datascopes.sql new file mode 100644 index 0000000..6f5030a --- /dev/null +++ b/sql/init_listener_test_no_datascopes.sql
@@ -0,0 +1,37 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE _transicator_metadata +(key varchar primary key, +value varchar); +INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests'); +CREATE TABLE _transicator_tables +(tableName varchar not null, +columnName varchar not null, +typid integer, +primaryKey bool); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); + +CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector)); +INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i'); + +CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector)); + +COMMIT;
diff --git a/sql/init_listener_test_valid_snapshot.sql b/sql/init_listener_test_valid_snapshot.sql new file mode 100644 index 0000000..4e7e7e5 --- /dev/null +++ b/sql/init_listener_test_valid_snapshot.sql
@@ -0,0 +1,40 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE _transicator_metadata +(key varchar primary key, +value varchar); +INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests'); +CREATE TABLE _transicator_tables +(tableName varchar not null, +columnName varchar not null, +typid integer, +primaryKey bool); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); + +CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector)); +INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i'); + +CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,org,env,_change_selector)); +INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e1','c','c','u','u','a'); +INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e2','c','c','u','u','a'); +INSERT INTO "edgex_data_scope" VALUES('k','a','s2','o','e3','c','c','u','u','a'); + +COMMIT;
diff --git a/sql/init_mock_boot_db.sql b/sql/init_mock_boot_db.sql new file mode 100644 index 0000000..ed66799 --- /dev/null +++ b/sql/init_mock_boot_db.sql
@@ -0,0 +1,36 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE _transicator_metadata +(key varchar primary key, +value varchar); +INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:'); +CREATE TABLE _transicator_tables +(tableName varchar not null, +columnName varchar not null, +typid integer, +primaryKey bool); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); +CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector)); +INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap'); +CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector)); +INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap'); +INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap'); +COMMIT;
diff --git a/sql/init_mock_db.sql b/sql/init_mock_db.sql new file mode 100644 index 0000000..dc14fda --- /dev/null +++ b/sql/init_mock_db.sql
@@ -0,0 +1,246 @@ +PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE _transicator_metadata +(key varchar primary key, +value varchar); +INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:'); +CREATE TABLE _transicator_tables +(tableName varchar not null, +columnName varchar not null, +typid integer, +primaryKey bool); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','data_scope_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_name',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_json',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','config_json',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('attributes','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('attributes','entity_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('attributes','cust_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','org_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','dev_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','comp_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','apiprdt_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','app_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('attributes','appcred_id',1043,0); +INSERT INTO "_transicator_tables" VALUES('attributes','name',1043,1); +INSERT INTO "_transicator_tables" VALUES('attributes','type',19701,1); +INSERT INTO "_transicator_tables" VALUES('attributes','value',1043,0); +INSERT INTO "_transicator_tables" VALUES('attributes','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_company','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_company','name',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_company','display_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company','status',19564,0); +INSERT INTO "_transicator_tables" VALUES('kms_company','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_company','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_company','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_organization','name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','display_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','type',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_organization','customer_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_organization','description',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_organization','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_organization','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','ext_ref_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','display_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','description',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('configuration','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('configuration','body',25,0); +INSERT INTO "_transicator_tables" VALUES('configuration','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('configuration','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('configuration','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('configuration','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','consumer_secret',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','method_type',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','status',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','issued_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','expires_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_status',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','scopes',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_at',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_at',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','company_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','developer_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','roles',1015,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_at',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_at',1114,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_company_developer','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','id',1043,1); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','deployment_id',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','action',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_id',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','apid_cluster_id',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','data_scope_id',1043,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_json',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','config_json',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created_by',25,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated',1114,0); +INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated_by',25,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','name',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','display_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','access_type',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','callback_url',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','status',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','app_family',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','company_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','developer_id',2950,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','parent_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','type',19625,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_app','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','appcred_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','app_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','apiprdt_id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','status',19670,0); +INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','_change_selector',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','id',2950,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','tenant_id',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','username',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','first_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','last_name',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','password',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','email',1043,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','status',19564,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','encrypted_password',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','salt',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','created_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','created_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_at',1114,1); +INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_by',1043,0); +INSERT INTO "_transicator_tables" VALUES('kms_developer','_change_selector',1043,0); +CREATE TABLE "kms_deployment" (id text,bundle_config_id text,apid_cluster_id text,data_scope_id text,bundle_config_name text,bundle_config_json text,config_json text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,bundle_config_id,apid_cluster_id,data_scope_id,_change_selector)); +INSERT INTO "kms_deployment" VALUES('321e443b-9db9-4043-b987-1599e0cdd029','1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85','bootstrap','dataScope1','gcp-test-bundle','{"id":"1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85","created":"2017-02-27T07:40:57.810Z","createdBy":"fierrom@google.com","updated":"2017-02-27T07:40:57.810Z","updatedBy":"fierrom@google.com","name":"gcp-test-bundle","uri":"https://gist.github.com/mdobson/f9d537c5192a660f692affc294266df2/archive/234c7cbf227d769278bee9b06ace51d6062fe96b.zip","checksumType":"md5","checksum":"06fde116f0270b3734a48653d0cfb495"}','{}','2017-02-27 07:41:33.888+00:00','fierrom@google.com','2017-02-27 07:41:33.888+00:00','fierrom@google.com','bootstrap'); +CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at)); +INSERT INTO "kms_api_product" VALUES('f5f07319-5104-471c-9df3-64b1842dbe00','43aef41d','test','test','','{/}','AUTO','{""}','{helloworld}','{test}','','',NULL,'2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','43aef41d'); +INSERT INTO "kms_api_product" VALUES('87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','43aef41d','GregProduct','Greg''s Product','A product for testing Greg','{/**}','AUTO','{""}','{}','{test}','','',NULL,'2017-03-01 22:50:41.75+00:00','greg@google.com','2017-03-01 22:50:41.75+00:00','greg@google.com','43aef41d'); +CREATE TABLE attributes (tenant_id text,entity_id text,cust_id text,org_id text,dev_id text,comp_id text,apiprdt_id text,app_id text,appcred_id text,name text,type text,value text,_change_selector text, primary key (tenant_id,tenant_id,entity_id,entity_id,name,type,type)); +INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isSmbOrganization','ORGANIZATION','true','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.mgmtGroup','ORGANIZATION','management-edgex','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isEdgexEnabled','ORGANIZATION','true','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isCpsEnabled','ORGANIZATION','true','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','access','APIPRODUCT','internal','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','test','APIPRODUCT','v1','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','DisplayName','APP','MitchTestApp','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','Notes','APP','','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','DisplayName','APP','MitchTestApp2','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','Notes','APP','','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','','','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','access','APIPRODUCT','public','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','DisplayName','APP','Greg''s Test App','43aef41d'); +INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','Notes','APP','','43aef41d'); +CREATE TABLE "kms_company" (id text,tenant_id text,name text,display_name text,status text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,created_at,updated_at)); +CREATE TABLE "kms_organization" (id text,name text,display_name text,type text,tenant_id text,customer_id text,description text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,customer_id,created_at,updated_at)); +INSERT INTO "kms_organization" VALUES('ff0b5496-c674-4531-9443-ace334504f59','edgex_gcp1','edgex_gcp1','paid','43aef41d','307eadd7-c6d7-4ec1-b433-59bcd22cd06d','','2017-02-25 00:17:58.159+00:00','vbhangale@apigee.com','2017-02-25 00:18:14.729+00:00','vbhangale@apigee.com','43aef41d'); +CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector)); +INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap'); +CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector)); +INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap'); +INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap'); +CREATE TABLE "kms_app_credential" (id text,tenant_id text,consumer_secret text,app_id text,method_type text,status text,issued_at blob,expires_at blob,app_status text,scopes text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,app_id,issued_at,expires_at)); +INSERT INTO "kms_app_credential" VALUES('xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','43aef41d','lscGO3lfs3zh8iQ','87c20a31-a504-4ed5-89a5-700adfbb0142','','APPROVED','2017-02-27 07:45:22.774+00:00','','','{}','2017-02-27 07:45:22.774+00:00','-NA-','2017-02-27 07:45:22.877+00:00','-NA-','43aef41d'); +INSERT INTO "kms_app_credential" VALUES('ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','43aef41d','5EBOSSQrLOLO9siN','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','APPROVED','2017-02-27 07:43:23.263+00:00','','','{}','2017-02-27 07:43:23.263+00:00','-NA-','2017-02-27 07:48:16.717+00:00','-NA-','43aef41d'); +INSERT INTO "kms_app_credential" VALUES('DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','43aef41d','MTfK8xscShhnops','be902c0c-c54b-4a65-85d6-358ff8639586','','APPROVED','2017-03-01 22:52:28.019+00:00','','','{}','2017-03-01 22:52:28.019+00:00','-NA-','2017-03-01 22:52:28.022+00:00','-NA-','43aef41d'); +CREATE TABLE "kms_company_developer" (tenant_id text,company_id text,developer_id text,roles text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (tenant_id,company_id,developer_id)); +CREATE TABLE "kms_app" (id text,tenant_id text,name text,display_name text,access_type text,callback_url text,status text,app_family text,company_id text,developer_id text,parent_id text,type text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,name,app_family,parent_id,created_at,updated_at)); +INSERT INTO "kms_app" VALUES('87c20a31-a504-4ed5-89a5-700adfbb0142','43aef41d','MitchTestApp2','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:45:21.586+00:00','fierrom@google.com' || + '','2017-02-27 07:45:21.586+00:00','fierrom@google.com' || + '','43aef41d'); +INSERT INTO "kms_app" VALUES('8f5c9b86-0783-439c-b8e6-7ab9549e30e8','43aef41d','MitchTestApp','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:43:22.301+00:00','fierrom@google.com' || + '','2017-02-27 07:48:18.964+00:00','fierrom@google.com' || + '','43aef41d'); +INSERT INTO "kms_app" VALUES('be902c0c-c54b-4a65-85d6-358ff8639586','43aef41d','GregTestApp','','','','APPROVED','default','','046974c2-9ae5-4452-a42f-bb6657e6cdbe','046974c2-9ae5-4452-a42f-bb6657e6cdbe','DEVELOPER','2017-03-01 22:52:27.615+00:00','greg@google.com','2017-03-01 22:52:27.615+00:00','greg@google.com','43aef41d'); +CREATE TABLE "kms_app_credential_apiproduct_mapper" (tenant_id text,appcred_id text,app_id text,apiprdt_id text,status text,_change_selector text, primary key (tenant_id,appcred_id,app_id,apiprdt_id)); +INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d'); +INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','87c20a31-a504-4ed5-89a5-700adfbb0142','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d'); +INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','be902c0c-c54b-4a65-85d6-358ff8639586','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','APPROVED','43aef41d'); +CREATE TABLE "kms_developer" (id text,tenant_id text,username text,first_name text,last_name text,password text,email text,status text,encrypted_password text,salt text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,username,email,created_at,updated_at)); +INSERT INTO "kms_developer" VALUES('8a350848-0aba-4dcc-aa60-97903efb42ef','43aef41d','mitchfierro','Mitch','Fierro','','fierrom@google.com','ACTIVE','','','2017-02-27 07:43:00.281+00:00','fierrom@google.com' || + '','2017-02-27 07:43:00.281+00:00','fierrom@google.com' || + '','43aef41d'); +INSERT INTO "kms_developer" VALUES('6ab21170-6bac-481d-9be1-9fda02bdd1da','43aef41d','adikgcp','gcp','dev','','adikancherla@gcp.com','ACTIVE','','','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','43aef41d'); +INSERT INTO "kms_developer" VALUES('046974c2-9ae5-4452-a42f-bb6657e6cdbe','43aef41d','gregbrail','Greg','Brail','','gregbrail@google.com','ACTIVE','','','2017-03-01 22:51:40.602+00:00','greg@google.com','2017-03-01 22:51:40.602+00:00','greg@google.com','43aef41d'); +COMMIT; \ No newline at end of file
diff --git a/token.go b/token.go index 424c8d4..cf5cec5 100644 --- a/token.go +++ b/token.go
@@ -242,4 +242,4 @@ return true } return time.Now().Add(refreshFloatTime).After(t.ExpiresAt) -} +} \ No newline at end of file
diff --git a/token_test.go b/token_test.go index c8ec7ba..ab48eff 100644 --- a/token_test.go +++ b/token_test.go
@@ -199,4 +199,4 @@ close(done) }, 3) }) -}) +}) \ No newline at end of file