Revert "Use defer to complete/rollback txn." This reverts commit b8a7560673f5f6bc0e7013903dba263f327da6f6.
diff --git a/data.go b/data.go index c0f7a31..fd4111c 100644 --- a/data.go +++ b/data.go
@@ -54,9 +54,6 @@ log.Errorf("initDB(): Unable to get DB tx err: {%v}", err) return err } - - defer completeTxn(tx, err) - _, err = tx.Exec(` CREATE TABLE IF NOT EXISTS APID ( instance_id text, @@ -66,9 +63,16 @@ ); `) if err != nil { + rollbackTxn(tx) log.Errorf("initDB(): Unable to tx exec err: {%v}", err) return err } + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + log.Errorf("initDB(): tx commit err: {%v}", err) + return err + } log.Debug("Database tables created.") return nil } @@ -173,7 +177,6 @@ if err == nil && affected != 0 { log.Debugf("DELETE Success [%s] values=%v", sql, values) } else if err == nil && affected == 0 { - err = errors.New("Entry not found. Nothing to delete") log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values) return false } else { @@ -441,11 +444,16 @@ return err } - defer completeTxn(tx, err) - _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence) if err != nil { log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) + rollbackTxn(tx) + return err + } + err = tx.Commit() + if err != nil { + log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err) + rollbackTxn(tx) return err } log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) @@ -469,13 +477,13 @@ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return } - defer completeTxn(tx, err) err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) if err != nil { if err != sql.ErrNoRows { log.Errorf("Unable to retrieve apidInstanceInfo: %v", err) + tx.Rollback() return } else { // first start - no row, generate a UUID and store it @@ -497,6 +505,14 @@ info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } + if err == nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) + } return } @@ -512,9 +528,6 @@ log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err } - - defer completeTxn(tx, err) - rows, err := tx.Exec(` REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) @@ -522,12 +535,20 @@ apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) if err != nil { log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err) + rollbackTxn(tx) return err } - n, err := rows.RowsAffected() if err == nil && n == 0 { err = errors.New("no rows affected") + rollbackTxn(tx) + } else if err == nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) } return err
diff --git a/listener.go b/listener.go index 265ab67..8635a81 100644 --- a/listener.go +++ b/listener.go
@@ -58,16 +58,8 @@ } } -func completeTxn (tx apid.Tx, err error) { - if err == nil { - err = tx.Commit() - if err == nil { - log.Debugf("Transaction committed successfully") - return - } - log.Errorf("Transaction commit failed with error : {%v}", err) - } - err = tx.Rollback() +func rollbackTxn (tx apid.Tx) { + err := tx.Rollback() if err != nil { log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err) } @@ -86,8 +78,6 @@ log.Panicf("Unable to read database: {%s}", err.Error()) } - defer completeTxn(tx, err) - if numApidClusters != 1 { log.Panic("Illegal state for apid_cluster. Must be a single row.") } @@ -95,11 +85,17 @@ _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") if err != nil { if err.Error() == "duplicate column name: last_sequence" { + rollbackTxn(tx) return } else { log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) } } + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + return + } } func processChangeList(changes *common.ChangeList) bool { @@ -111,7 +107,7 @@ log.Panicf("Error processing ChangeList: %v", err) return ok } - defer completeTxn(tx, err) + defer tx.Rollback() log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) @@ -136,5 +132,11 @@ } } + err = tx.Commit() + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + return false + } + return ok }