Make apigeeSync compatible with new apid-core
diff --git a/data.go b/data.go
index a60902c..1fb36b2 100644
--- a/data.go
+++ b/data.go
@@ -79,7 +79,7 @@
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
if len(rows) == 0 {
return false
@@ -133,7 +133,7 @@
return values
}
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
@@ -200,7 +200,7 @@
}
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -456,12 +456,18 @@
if err != nil {
return
}
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return
+ }
- err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+ err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
+ tx.Rollback()
return
} else {
// first start - no row, generate a UUID and store it
@@ -470,7 +476,7 @@
info.InstanceID = GenerateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
- db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
@@ -479,10 +485,18 @@
newInstanceID = true
info.InstanceID = GenerateUUID()
- db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
+ if err != nil {
+ err = tx.Commit()
+ if err != nil {
+ rollbackTxn(tx)
+ }
+ } else {
+ rollbackTxn(tx)
+ }
return
}
@@ -493,18 +507,32 @@
if err != nil {
return err
}
-
- rows, err := db.Exec(`
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return err
+ }
+ rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
+ log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
+ rollbackTxn(tx)
return err
}
n, err := rows.RowsAffected()
if err == nil && n == 0 {
err = errors.New("no rows affected")
+ rollbackTxn(tx)
+ } else if err == nil {
+ err = tx.Commit()
+ if err != nil {
+ rollbackTxn(tx)
+ }
+ } else {
+ rollbackTxn(tx)
}
return err
diff --git a/listener.go b/listener.go
index f7b7157..8635a81 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
package apidApigeeSync
import (
- "database/sql"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
@@ -59,7 +58,7 @@
}
}
-func rollbackTxn (tx *sql.Tx) {
+func rollbackTxn (tx apid.Tx) {
err := tx.Rollback()
if err != nil {
log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)