fix defer rollback transaction issues. (#65)
diff --git a/data.go b/data.go index 4376a76..3f50df4 100644 --- a/data.go +++ b/data.go
@@ -54,7 +54,7 @@ log.Errorf("initDB(): Unable to get DB tx err: {%v}", err) return err } - defer completeTxn(tx, err) + defer tx.Rollback() _, err = tx.Exec(` CREATE TABLE IF NOT EXISTS APID ( instance_id text, @@ -67,6 +67,10 @@ log.Errorf("initDB(): Unable to tx exec err: {%v}", err) return err } + if err = tx.Commit(); err != nil { + log.Errorf("Error when initDb: %v", err) + return err + } log.Debug("Database tables created.") return nil } @@ -437,14 +441,17 @@ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err } - defer completeTxn(tx, err) + defer tx.Rollback() _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence) if err != nil { log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) return err } log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) - return nil + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in updateLastSequence: %v", err) + } + return err } func getApidInstanceInfo() (info apidInstanceInfo, err error) { @@ -464,7 +471,7 @@ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return } - defer completeTxn(tx, err) + defer tx.Rollback() err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) if err != nil { @@ -491,6 +498,9 @@ info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in getApidInstanceInfo: %v", err) + } return } @@ -506,7 +516,7 @@ log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err } - defer completeTxn(tx, err) + defer tx.Rollback() rows, err := tx.Exec(` REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) @@ -520,6 +530,9 @@ if err == nil && n == 0 { err = errors.New("no rows affected") } + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in updateApidInstanceInfo: %v", err) + } return err }
diff --git a/data_test.go b/data_test.go index 53b7509..691e887 100644 --- a/data_test.go +++ b/data_test.go
@@ -15,13 +15,13 @@ package apidApigeeSync import ( + "github.com/apid/apid-core" "github.com/apid/apid-core/data" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "sort" "strconv" - "github.com/apid/apid-core" ) var _ = Describe("data access tests", func() {
diff --git a/listener.go b/listener.go index e9f24ad..94befd6 100644 --- a/listener.go +++ b/listener.go
@@ -30,7 +30,7 @@ var prevDb string if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo { log.Debugf("Release snapshot for {%s}. Switching to version {%s}", - apidInfo.LastSnapshot , snapshot.SnapshotInfo) + apidInfo.LastSnapshot, snapshot.SnapshotInfo) prevDb = apidInfo.LastSnapshot } else { log.Debugf("Process snapshot for version {%s}", @@ -59,22 +59,6 @@ } } - -func completeTxn (tx apid.Tx, err error) { - if err == nil { - err = tx.Commit() - if err == nil { - log.Debugf("Transaction committed successfully") - return - } - log.Errorf("Transaction commit failed with error : {%v}", err) - } - err = tx.Rollback() - if err != nil { - log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err) - } -} - func processSqliteSnapshot(db apid.DB) { var numApidClusters int @@ -82,7 +66,7 @@ if err != nil { log.Panicf("Unable to open DB txn: {%v}", err.Error()) } - defer completeTxn(tx, err) + defer tx.Rollback() err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) if err != nil { log.Panicf("Unable to read database: {%s}", err.Error()) @@ -100,6 +84,9 @@ log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) } } + if err = tx.Commit(); err != nil { + log.Errorf("Error when commit in processSqliteSnapshot: %v", err) + } } func processChangeList(changes *common.ChangeList) bool { @@ -109,9 +96,8 @@ tx, err := getDB().Begin() if err != nil { log.Panicf("Error processing ChangeList: %v", err) - return ok } - defer completeTxn(tx, err) + defer tx.Rollback() log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) @@ -133,9 +119,13 @@ if !ok { err = errors.New("Sql Operation error. Operation rollbacked") log.Error("Sql Operation error. Operation rollbacked") - return ok + return false } } - return ok + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in processChangeList: %v", err) + return false + } + return true }