Use defer to complete/rollback txn.
diff --git a/data.go b/data.go
index fd4111c..c0f7a31 100644
--- a/data.go
+++ b/data.go
@@ -54,6 +54,9 @@
log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
return err
}
+
+ defer completeTxn(tx, err)
+
_, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
@@ -63,16 +66,9 @@
);
`)
if err != nil {
- rollbackTxn(tx)
log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
return err
}
- err = tx.Commit()
- if err != nil {
- rollbackTxn(tx)
- log.Errorf("initDB(): tx commit err: {%v}", err)
- return err
- }
log.Debug("Database tables created.")
return nil
}
@@ -177,6 +173,7 @@
if err == nil && affected != 0 {
log.Debugf("DELETE Success [%s] values=%v", sql, values)
} else if err == nil && affected == 0 {
+ err = errors.New("Entry not found. Nothing to delete")
log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
return false
} else {
@@ -444,16 +441,11 @@
return err
}
+ defer completeTxn(tx, err)
+
_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
- rollbackTxn(tx)
- return err
- }
- err = tx.Commit()
- if err != nil {
- log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err)
- rollbackTxn(tx)
return err
}
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
@@ -477,13 +469,13 @@
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return
}
+ defer completeTxn(tx, err)
err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
- tx.Rollback()
return
} else {
// first start - no row, generate a UUID and store it
@@ -505,14 +497,6 @@
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
- if err == nil {
- err = tx.Commit()
- if err != nil {
- rollbackTxn(tx)
- }
- } else {
- rollbackTxn(tx)
- }
return
}
@@ -528,6 +512,9 @@
log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
+
+ defer completeTxn(tx, err)
+
rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
@@ -535,20 +522,12 @@
apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
- rollbackTxn(tx)
return err
}
+
n, err := rows.RowsAffected()
if err == nil && n == 0 {
err = errors.New("no rows affected")
- rollbackTxn(tx)
- } else if err == nil {
- err = tx.Commit()
- if err != nil {
- rollbackTxn(tx)
- }
- } else {
- rollbackTxn(tx)
}
return err
diff --git a/listener.go b/listener.go
index 8635a81..265ab67 100644
--- a/listener.go
+++ b/listener.go
@@ -58,8 +58,16 @@
}
}
-func rollbackTxn (tx apid.Tx) {
- err := tx.Rollback()
+func completeTxn (tx apid.Tx, err error) {
+ if err == nil {
+ err = tx.Commit()
+ if err == nil {
+ log.Debugf("Transaction committed successfully")
+ return
+ }
+ log.Errorf("Transaction commit failed with error : {%v}", err)
+ }
+ err = tx.Rollback()
if err != nil {
log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
}
@@ -78,6 +86,8 @@
log.Panicf("Unable to read database: {%s}", err.Error())
}
+ defer completeTxn(tx, err)
+
if numApidClusters != 1 {
log.Panic("Illegal state for apid_cluster. Must be a single row.")
}
@@ -85,17 +95,11 @@
_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
if err != nil {
if err.Error() == "duplicate column name: last_sequence" {
- rollbackTxn(tx)
return
} else {
log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
}
}
- err = tx.Commit()
- if err != nil {
- rollbackTxn(tx)
- return
- }
}
func processChangeList(changes *common.ChangeList) bool {
@@ -107,7 +111,7 @@
log.Panicf("Error processing ChangeList: %v", err)
return ok
}
- defer tx.Rollback()
+ defer completeTxn(tx, err)
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
@@ -132,11 +136,5 @@
}
}
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- return false
- }
-
return ok
}