[XAPID-1087]Make sure all updates are in a Txn.
diff --git a/data.go b/data.go
index a60902c..878038d 100644
--- a/data.go
+++ b/data.go
@@ -79,7 +79,7 @@
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
if len(rows) == 0 {
return false
@@ -133,7 +133,7 @@
return values
}
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
@@ -200,7 +200,7 @@
}
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -426,19 +426,29 @@
log.Debugf("updateLastSequence: %s", lastSequence)
- stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
+ db, err := dataService.DB()
if err != nil {
- log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+ log.Errorf("updateLastSequence: Unable to get DB Err: {%v}", err)
return err
}
- defer stmt.Close()
-
- _, err = stmt.Exec(lastSequence)
+ tx, err := db.Begin()
if err != nil {
- log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
+ _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;", lastSequence)
+ if err != nil {
+ log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+ rollbackTxn(tx)
+ return err
+ }
+ err = tx.Commit()
+ if err != nil {
+ log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err)
+ rollbackTxn(tx)
+ return err
+ }
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
log.Infof("Replication lastSequence=%s", lastSequence)
return nil
@@ -456,12 +466,18 @@
if err != nil {
return
}
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return
+ }
- err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+ err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
+ tx.Rollback()
return
} else {
// first start - no row, generate a UUID and store it
@@ -470,7 +486,7 @@
info.InstanceID = GenerateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
- db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
@@ -479,10 +495,18 @@
newInstanceID = true
info.InstanceID = GenerateUUID()
- db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+ _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
+ if err != nil {
+ err = tx.Commit()
+ if err != nil {
+ rollbackTxn(tx)
+ }
+ } else {
+ rollbackTxn(tx)
+ }
return
}
@@ -493,18 +517,32 @@
if err != nil {
return err
}
-
- rows, err := db.Exec(`
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return err
+ }
+ rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
+ log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
+ rollbackTxn(tx)
return err
}
n, err := rows.RowsAffected()
if err == nil && n == 0 {
err = errors.New("no rows affected")
+ rollbackTxn(tx)
+ } else if err == nil {
+ err = tx.Commit()
+ if err != nil {
+ rollbackTxn(tx)
+ }
+ } else {
+ rollbackTxn(tx)
}
return err
diff --git a/listener.go b/listener.go
index d397171..8635a81 100644
--- a/listener.go
+++ b/listener.go
@@ -58,43 +58,44 @@
}
}
+func rollbackTxn (tx apid.Tx) {
+ err := tx.Rollback()
+ if err != nil {
+ log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
+ }
+}
+
func processSqliteSnapshot(db apid.DB) {
var numApidClusters int
tx, err := db.Begin()
if err != nil {
- log.Panicf("Unable to open DB txn: %v", err)
+ log.Panicf("Unable to open DB txn: {%v}", err.Error())
}
err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
if err != nil {
- log.Panicf("Unable to read database: %s", err.Error())
+ log.Panicf("Unable to read database: {%s}", err.Error())
}
if numApidClusters != 1 {
log.Panic("Illegal state for apid_cluster. Must be a single row.")
}
- prep, err := tx.Prepare("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+ _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
if err != nil {
if err.Error() == "duplicate column name: last_sequence" {
- tx.Rollback()
+ rollbackTxn(tx)
return
} else {
- log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err)
+ log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
}
}
-
- _, err = prep.Exec()
+ err = tx.Commit()
if err != nil {
- log.Debugf("Snapshot processing DB exec failed. Err: %v", err)
- prep.Close()
- tx.Rollback()
+ rollbackTxn(tx)
return
}
- prep.Close()
- tx.Commit()
-
}
func processChangeList(changes *common.ChangeList) bool {