Merge pull request #61 from 30x/APIRT-4751-newapidCore
Apirt 4751 newapid core
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 3d897d6..bd85b97 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -58,8 +58,12 @@
if wipeDBAferTest {
db, err := dataService.DB()
Expect(err).NotTo(HaveOccurred())
- _, err = db.Exec("DELETE FROM APID")
+ tx, err := db.Begin()
+ _, err = tx.Exec("DELETE FROM APID")
Expect(err).NotTo(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).NotTo(HaveOccurred())
+
}
wipeDBAferTest = true
newInstanceID = true
@@ -100,7 +104,10 @@
if wipeDBAferTest {
db, err := dataService.DB()
Expect(err).NotTo(HaveOccurred())
- _, err = db.Exec("DELETE FROM APID")
+ tx, err := db.Begin()
+ _, err = tx.Exec("DELETE FROM APID")
+ Expect(err).NotTo(HaveOccurred())
+ err = tx.Commit()
Expect(err).NotTo(HaveOccurred())
}
wipeDBAferTest = true
@@ -181,16 +188,22 @@
Expect(true).To(Equal(numApidClusters.Next()))
numApidClusters.Scan(&rowCount)
Expect(1).To(Equal(rowCount))
- apidClusters, _ := db.Query("select id from edgex_apid_cluster;")
+ numApidClusters.Close()
+ apidClusters, err := db.Query("select id from edgex_apid_cluster;")
+ Expect(err).NotTo(HaveOccurred())
apidClusters.Next()
apidClusters.Scan(&id)
Expect(id).To(Equal(expectedClusterId))
+ apidClusters.Close()
- numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;")
+ numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;")
+ Expect(err).NotTo(HaveOccurred())
Expect(true).To(Equal(numDataScopes.Next()))
numDataScopes.Scan(&rowCount)
Expect(2).To(Equal(rowCount))
- dataScopes, _ := db.Query("select id from edgex_data_scope;")
+ numDataScopes.Close()
+ dataScopes, err := db.Query("select id from edgex_data_scope;")
+ Expect(err).NotTo(HaveOccurred())
dataScopes.Next()
dataScopes.Scan(&id)
dataScopes.Next()
@@ -202,6 +215,7 @@
dataScopes.Scan(&id)
Expect(id).To(Equal(expectedDataScopeId1))
}
+ dataScopes.Close()
} else if cl, ok := event.(*common.ChangeList); ok {
closeDone = apidChangeManager.close()
diff --git a/change_test.go b/change_test.go
index d7ee964..e7eed6f 100644
--- a/change_test.go
+++ b/change_test.go
@@ -82,7 +82,10 @@
if wipeDBAferTest {
db, err := dataService.DB()
Expect(err).Should(Succeed())
- _, err = db.Exec("DELETE FROM APID")
+ tx, err := db.Begin()
+ _, err = tx.Exec("DELETE FROM APID")
+ Expect(err).Should(Succeed())
+ err = tx.Commit()
Expect(err).Should(Succeed())
}
wipeDBAferTest = true
diff --git a/changes.go b/changes.go
index 6358307..20e7f6d 100644
--- a/changes.go
+++ b/changes.go
@@ -336,7 +336,7 @@
lastSequence = seq
err := updateLastSequence(seq)
if err != nil {
- log.Panic("Unable to update Sequence in DB")
+ log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
}
}
diff --git a/data.go b/data.go
index a60902c..3c6994b 100644
--- a/data.go
+++ b/data.go
@@ -49,7 +49,13 @@
(Currently, the snapshot never changes, but this is future-proof)
*/
func initDB(db apid.DB) error {
- _, err := db.Exec(`
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
+ return err
+ }
+ defer completeTxn(tx, err)
+ _, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
apid_cluster_id text,
@@ -58,9 +64,9 @@
);
`)
if err != nil {
+ log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
return err
}
-
log.Debug("Database tables created.")
return nil
}
@@ -79,7 +85,7 @@
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
if len(rows) == 0 {
return false
@@ -133,7 +139,7 @@
return values
}
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
@@ -200,7 +206,7 @@
}
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -426,21 +432,18 @@
log.Debugf("updateLastSequence: %s", lastSequence)
- stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
+ tx, err := getDB().Begin()
+ if err != nil {
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return err
+ }
+ defer completeTxn(tx, err)
+ _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
- defer stmt.Close()
-
- _, err = stmt.Exec(lastSequence)
- if err != nil {
- log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
- return err
- }
-
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
- log.Infof("Replication lastSequence=%s", lastSequence)
return nil
}
@@ -456,8 +459,13 @@
if err != nil {
return
}
-
- err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return
+ }
+ defer completeTxn(tx, err)
+ err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
@@ -470,7 +478,7 @@
info.InstanceID = GenerateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
- db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
@@ -479,7 +487,7 @@
newInstanceID = true
info.InstanceID = GenerateUUID()
- db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
@@ -493,13 +501,19 @@
if err != nil {
return err
}
-
- rows, err := db.Exec(`
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return err
+ }
+ defer completeTxn(tx, err)
+ rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
+ log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
return err
}
n, err := rows.RowsAffected()
diff --git a/data_test.go b/data_test.go
index 438a1d4..0173671 100644
--- a/data_test.go
+++ b/data_test.go
@@ -21,6 +21,7 @@
. "github.com/onsi/gomega"
"sort"
"strconv"
+ "github.com/30x/apid-core"
)
var _ = Describe("data access tests", func() {
@@ -31,31 +32,7 @@
db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount))
Expect(err).Should(Succeed())
initDB(db)
- //all tests in this file operate on the api_product table. Create the necessary tables for this here
- db.Exec("CREATE TABLE _transicator_tables " +
- "(tableName varchar not null, columnName varchar not null, " +
- "typid integer, primaryKey bool);")
- db.Exec("DELETE from _transicator_tables")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
- db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
- "api_resources text,approval_type text,scopes text,proxies text, environments text," +
- "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
- "primary key (id,tenant_id,created_at,updated_at));")
- db.Exec("DELETE from kms_api_product")
-
+ createBootstrapTables(db)
setDB(db)
})
@@ -1204,3 +1181,34 @@
}, 3)
})
})
+
+func createBootstrapTables(db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).To(Succeed())
+ //all tests in this file operate on the api_product table. Create the necessary tables for this here
+ tx.Exec("CREATE TABLE _transicator_tables " +
+ "(tableName varchar not null, columnName varchar not null, " +
+ "typid integer, primaryKey bool);")
+ tx.Exec("DELETE from _transicator_tables")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+ tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+ tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+ "api_resources text,approval_type text,scopes text,proxies text, environments text," +
+ "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+ "primary key (id,tenant_id,created_at,updated_at));")
+ tx.Exec("DELETE from kms_api_product")
+ err = tx.Commit()
+ Expect(err).To(Succeed())
+}
diff --git a/listener.go b/listener.go
index f7b7157..55492c3 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,7 @@
package apidApigeeSync
import (
- "database/sql"
+ "errors"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
@@ -59,8 +59,17 @@
}
}
-func rollbackTxn (tx *sql.Tx) {
- err := tx.Rollback()
+
+func completeTxn (tx apid.Tx, err error) {
+ if err == nil {
+ err = tx.Commit()
+ if err == nil {
+ log.Debugf("Transaction committed successfully")
+ return
+ }
+ log.Errorf("Transaction commit failed with error : {%v}", err)
+ }
+ err = tx.Rollback()
if err != nil {
log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
}
@@ -73,7 +82,7 @@
if err != nil {
log.Panicf("Unable to open DB txn: {%v}", err.Error())
}
-
+ defer completeTxn(tx, err)
err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
if err != nil {
log.Panicf("Unable to read database: {%s}", err.Error())
@@ -86,17 +95,11 @@
_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
if err != nil {
if err.Error() == "duplicate column name: last_sequence" {
- rollbackTxn(tx)
return
} else {
log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
}
}
- err = tx.Commit()
- if err != nil {
- rollbackTxn(tx)
- return
- }
}
func processChangeList(changes *common.ChangeList) bool {
@@ -108,7 +111,7 @@
log.Panicf("Error processing ChangeList: %v", err)
return ok
}
- defer tx.Rollback()
+ defer completeTxn(tx, err)
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
@@ -128,16 +131,11 @@
ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
}
if !ok {
+ err = errors.New("Sql Operation error. Operation rollbacked")
log.Error("Sql Operation error. Operation rollbacked")
return ok
}
}
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- return false
- }
-
return ok
}
diff --git a/listener_test.go b/listener_test.go
index 13ea0bf..55df0a5 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -46,7 +46,11 @@
if wipeDBAferTest {
db, err := dataService.DB()
Expect(err).Should(Succeed())
- _, err = db.Exec("DELETE FROM APID")
+ tx, err := db.Begin()
+ Expect(err).Should(Succeed())
+ _, err = tx.Exec("DELETE FROM APID")
+ Expect(err).Should(Succeed())
+ err = tx.Commit()
Expect(err).Should(Succeed())
}
wipeDBAferTest = true