fix defer rollback transaction issues.
diff --git a/data.go b/data.go
index 4376a76..3f50df4 100644
--- a/data.go
+++ b/data.go
@@ -54,7 +54,7 @@
log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
return err
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
_, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
@@ -67,6 +67,10 @@
log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
return err
}
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Error when initDb: %v", err)
+ return err
+ }
log.Debug("Database tables created.")
return nil
}
@@ -437,14 +441,17 @@
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
- return nil
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Commit error in updateLastSequence: %v", err)
+ }
+ return err
}
func getApidInstanceInfo() (info apidInstanceInfo, err error) {
@@ -464,7 +471,7 @@
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
@@ -491,6 +498,9 @@
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Commit error in getApidInstanceInfo: %v", err)
+ }
return
}
@@ -506,7 +516,7 @@
log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
@@ -520,6 +530,9 @@
if err == nil && n == 0 {
err = errors.New("no rows affected")
}
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Commit error in updateApidInstanceInfo: %v", err)
+ }
return err
}
diff --git a/data_test.go b/data_test.go
index 53b7509..691e887 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,13 +15,13 @@
package apidApigeeSync
import (
+ "github.com/apid/apid-core"
"github.com/apid/apid-core/data"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sort"
"strconv"
- "github.com/apid/apid-core"
)
var _ = Describe("data access tests", func() {
diff --git a/listener.go b/listener.go
index e9f24ad..94befd6 100644
--- a/listener.go
+++ b/listener.go
@@ -30,7 +30,7 @@
var prevDb string
if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
- apidInfo.LastSnapshot , snapshot.SnapshotInfo)
+ apidInfo.LastSnapshot, snapshot.SnapshotInfo)
prevDb = apidInfo.LastSnapshot
} else {
log.Debugf("Process snapshot for version {%s}",
@@ -59,22 +59,6 @@
}
}
-
-func completeTxn (tx apid.Tx, err error) {
- if err == nil {
- err = tx.Commit()
- if err == nil {
- log.Debugf("Transaction committed successfully")
- return
- }
- log.Errorf("Transaction commit failed with error : {%v}", err)
- }
- err = tx.Rollback()
- if err != nil {
- log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
- }
-}
-
func processSqliteSnapshot(db apid.DB) {
var numApidClusters int
@@ -82,7 +66,7 @@
if err != nil {
log.Panicf("Unable to open DB txn: {%v}", err.Error())
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
if err != nil {
log.Panicf("Unable to read database: {%s}", err.Error())
@@ -100,6 +84,9 @@
log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
}
}
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Error when commit in processSqliteSnapshot: %v", err)
+ }
}
func processChangeList(changes *common.ChangeList) bool {
@@ -109,9 +96,8 @@
tx, err := getDB().Begin()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
- return ok
}
- defer completeTxn(tx, err)
+ defer tx.Rollback()
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
@@ -133,9 +119,13 @@
if !ok {
err = errors.New("Sql Operation error. Operation rollbacked")
log.Error("Sql Operation error. Operation rollbacked")
- return ok
+ return false
}
}
- return ok
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Commit error in processChangeList: %v", err)
+ return false
+ }
+ return true
}