Merge branch 'APIRT-4751' of github.com:30x/apidApigeeSync into APIRT-4751
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 3d897d6..bd85b97 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -58,8 +58,12 @@ if wipeDBAferTest { db, err := dataService.DB() Expect(err).NotTo(HaveOccurred()) - _, err = db.Exec("DELETE FROM APID") + tx, err := db.Begin() + _, err = tx.Exec("DELETE FROM APID") Expect(err).NotTo(HaveOccurred()) + err = tx.Commit() + Expect(err).NotTo(HaveOccurred()) + } wipeDBAferTest = true newInstanceID = true @@ -100,7 +104,10 @@ if wipeDBAferTest { db, err := dataService.DB() Expect(err).NotTo(HaveOccurred()) - _, err = db.Exec("DELETE FROM APID") + tx, err := db.Begin() + _, err = tx.Exec("DELETE FROM APID") + Expect(err).NotTo(HaveOccurred()) + err = tx.Commit() Expect(err).NotTo(HaveOccurred()) } wipeDBAferTest = true @@ -181,16 +188,22 @@ Expect(true).To(Equal(numApidClusters.Next())) numApidClusters.Scan(&rowCount) Expect(1).To(Equal(rowCount)) - apidClusters, _ := db.Query("select id from edgex_apid_cluster;") + numApidClusters.Close() + apidClusters, err := db.Query("select id from edgex_apid_cluster;") + Expect(err).NotTo(HaveOccurred()) apidClusters.Next() apidClusters.Scan(&id) Expect(id).To(Equal(expectedClusterId)) + apidClusters.Close() - numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;") + numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;") + Expect(err).NotTo(HaveOccurred()) Expect(true).To(Equal(numDataScopes.Next())) numDataScopes.Scan(&rowCount) Expect(2).To(Equal(rowCount)) - dataScopes, _ := db.Query("select id from edgex_data_scope;") + numDataScopes.Close() + dataScopes, err := db.Query("select id from edgex_data_scope;") + Expect(err).NotTo(HaveOccurred()) dataScopes.Next() dataScopes.Scan(&id) dataScopes.Next() @@ -202,6 +215,7 @@ dataScopes.Scan(&id) Expect(id).To(Equal(expectedDataScopeId1)) } + dataScopes.Close() } else if cl, ok := event.(*common.ChangeList); ok { closeDone = apidChangeManager.close()
diff --git a/change_test.go b/change_test.go index d7ee964..e7eed6f 100644 --- a/change_test.go +++ b/change_test.go
@@ -82,7 +82,10 @@ if wipeDBAferTest { db, err := dataService.DB() Expect(err).Should(Succeed()) - _, err = db.Exec("DELETE FROM APID") + tx, err := db.Begin() + _, err = tx.Exec("DELETE FROM APID") + Expect(err).Should(Succeed()) + err = tx.Commit() Expect(err).Should(Succeed()) } wipeDBAferTest = true
diff --git a/changes.go b/changes.go index 6358307..20e7f6d 100644 --- a/changes.go +++ b/changes.go
@@ -336,7 +336,7 @@ lastSequence = seq err := updateLastSequence(seq) if err != nil { - log.Panic("Unable to update Sequence in DB") + log.Panicf("Unable to update Sequence in DB. Err {%v}", err) } }
diff --git a/data.go b/data.go index a60902c..3c6994b 100644 --- a/data.go +++ b/data.go
@@ -49,7 +49,13 @@ (Currently, the snapshot never changes, but this is future-proof) */ func initDB(db apid.DB) error { - _, err := db.Exec(` + tx, err := db.Begin() + if err != nil { + log.Errorf("initDB(): Unable to get DB tx err: {%v}", err) + return err + } + defer completeTxn(tx, err) + _, err = tx.Exec(` CREATE TABLE IF NOT EXISTS APID ( instance_id text, apid_cluster_id text, @@ -58,9 +64,9 @@ ); `) if err != nil { + log.Errorf("initDB(): Unable to tx exec err: {%v}", err) return err } - log.Debug("Database tables created.") return nil } @@ -79,7 +85,7 @@ } //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn -func insert(tableName string, rows []common.Row, txn *sql.Tx) bool { +func insert(tableName string, rows []common.Row, txn apid.Tx) bool { if len(rows) == 0 { return false @@ -133,7 +139,7 @@ return values } -func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool { +func _delete(tableName string, rows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) sort.Strings(pkeys) if len(pkeys) == 0 || err != nil { @@ -200,7 +206,7 @@ } -func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool { +func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) if len(pkeys) == 0 || err != nil { log.Errorf("UPDATE No primary keys found for table.", tableName) @@ -426,21 +432,18 @@ log.Debugf("updateLastSequence: %s", lastSequence) - stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;") + tx, err := getDB().Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return err + } + defer completeTxn(tx, err) + _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence) if err != nil { log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) return err } - defer stmt.Close() - - _, err = stmt.Exec(lastSequence) - if err != nil { - log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) - return err - } - log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) - log.Infof("Replication lastSequence=%s", lastSequence) return nil } @@ -456,8 +459,13 @@ if err != nil { return } - - err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). + tx, err := db.Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return + } + defer completeTxn(tx, err) + err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) if err != nil { if err != sql.ErrNoRows { @@ -470,7 +478,7 @@ info.InstanceID = GenerateUUID() log.Debugf("Inserting new apid instance id %s", info.InstanceID) - db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") } } else if savedClusterId != info.ClusterID { @@ -479,7 +487,7 @@ newInstanceID = true info.InstanceID = GenerateUUID() - db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } @@ -493,13 +501,19 @@ if err != nil { return err } - - rows, err := db.Exec(` + tx, err := db.Begin() + if err != nil { + log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return err + } + defer completeTxn(tx, err) + rows, err := tx.Exec(` REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)`, apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) if err != nil { + log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err) return err } n, err := rows.RowsAffected()
diff --git a/data_test.go b/data_test.go index 438a1d4..0173671 100644 --- a/data_test.go +++ b/data_test.go
@@ -21,6 +21,7 @@ . "github.com/onsi/gomega" "sort" "strconv" + "github.com/30x/apid-core" ) var _ = Describe("data access tests", func() { @@ -31,31 +32,7 @@ db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount)) Expect(err).Should(Succeed()) initDB(db) - //all tests in this file operate on the api_product table. Create the necessary tables for this here - db.Exec("CREATE TABLE _transicator_tables " + - "(tableName varchar not null, columnName varchar not null, " + - "typid integer, primaryKey bool);") - db.Exec("DELETE from _transicator_tables") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)") - db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)") - - db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " + - "api_resources text,approval_type text,scopes text,proxies text, environments text," + - "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " + - "primary key (id,tenant_id,created_at,updated_at));") - db.Exec("DELETE from kms_api_product") - + createBootstrapTables(db) setDB(db) }) @@ -1204,3 +1181,34 @@ }, 3) }) }) + +func createBootstrapTables(db apid.DB) { + tx, err := db.Begin() + Expect(err).To(Succeed()) + //all tests in this file operate on the api_product table. Create the necessary tables for this here + tx.Exec("CREATE TABLE _transicator_tables " + + "(tableName varchar not null, columnName varchar not null, " + + "typid integer, primaryKey bool);") + tx.Exec("DELETE from _transicator_tables") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)") + tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)") + + tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " + + "api_resources text,approval_type text,scopes text,proxies text, environments text," + + "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " + + "primary key (id,tenant_id,created_at,updated_at));") + tx.Exec("DELETE from kms_api_product") + err = tx.Commit() + Expect(err).To(Succeed()) +}
diff --git a/listener.go b/listener.go index f7b7157..55492c3 100644 --- a/listener.go +++ b/listener.go
@@ -15,7 +15,7 @@ package apidApigeeSync import ( - "database/sql" + "errors" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) @@ -59,8 +59,17 @@ } } -func rollbackTxn (tx *sql.Tx) { - err := tx.Rollback() + +func completeTxn (tx apid.Tx, err error) { + if err == nil { + err = tx.Commit() + if err == nil { + log.Debugf("Transaction committed successfully") + return + } + log.Errorf("Transaction commit failed with error : {%v}", err) + } + err = tx.Rollback() if err != nil { log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err) } @@ -73,7 +82,7 @@ if err != nil { log.Panicf("Unable to open DB txn: {%v}", err.Error()) } - + defer completeTxn(tx, err) err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) if err != nil { log.Panicf("Unable to read database: {%s}", err.Error()) @@ -86,17 +95,11 @@ _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") if err != nil { if err.Error() == "duplicate column name: last_sequence" { - rollbackTxn(tx) return } else { log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) } } - err = tx.Commit() - if err != nil { - rollbackTxn(tx) - return - } } func processChangeList(changes *common.ChangeList) bool { @@ -108,7 +111,7 @@ log.Panicf("Error processing ChangeList: %v", err) return ok } - defer tx.Rollback() + defer completeTxn(tx, err) log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) @@ -128,16 +131,11 @@ ok = _delete(change.Table, []common.Row{change.OldRow}, tx) } if !ok { + err = errors.New("Sql Operation error. Operation rollbacked") log.Error("Sql Operation error. Operation rollbacked") return ok } } - err = tx.Commit() - if err != nil { - log.Panicf("Error processing ChangeList: %v", err) - return false - } - return ok }
diff --git a/listener_test.go b/listener_test.go index 13ea0bf..55df0a5 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -46,7 +46,11 @@ if wipeDBAferTest { db, err := dataService.DB() Expect(err).Should(Succeed()) - _, err = db.Exec("DELETE FROM APID") + tx, err := db.Begin() + Expect(err).Should(Succeed()) + _, err = tx.Exec("DELETE FROM APID") + Expect(err).Should(Succeed()) + err = tx.Commit() Expect(err).Should(Succeed()) } wipeDBAferTest = true