Fix DB bug, and make sure DB INS/DEL/UP occur via txn
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 3d897d6..6dfb0d4 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -181,10 +181,12 @@
Expect(true).To(Equal(numApidClusters.Next()))
numApidClusters.Scan(&rowCount)
Expect(1).To(Equal(rowCount))
+ numApidClusters.Close()
apidClusters, _ := db.Query("select id from edgex_apid_cluster;")
apidClusters.Next()
apidClusters.Scan(&id)
Expect(id).To(Equal(expectedClusterId))
+ apidClusters.Close()
numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;")
Expect(true).To(Equal(numDataScopes.Next()))
diff --git a/changes.go b/changes.go
index 6358307..20e7f6d 100644
--- a/changes.go
+++ b/changes.go
@@ -336,7 +336,7 @@
lastSequence = seq
err := updateLastSequence(seq)
if err != nil {
- log.Panic("Unable to update Sequence in DB")
+ log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
}
}
diff --git a/data.go b/data.go
index 878038d..fd4111c 100644
--- a/data.go
+++ b/data.go
@@ -49,7 +49,12 @@
(Currently, the snapshot never changes, but this is future-proof)
*/
func initDB(db apid.DB) error {
- _, err := db.Exec(`
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
+ return err
+ }
+ _, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
apid_cluster_id text,
@@ -58,9 +63,16 @@
);
`)
if err != nil {
+ rollbackTxn(tx)
+ log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
return err
}
-
+ err = tx.Commit()
+ if err != nil {
+ rollbackTxn(tx)
+ log.Errorf("initDB(): tx commit err: {%v}", err)
+ return err
+ }
log.Debug("Database tables created.")
return nil
}
@@ -426,18 +438,13 @@
log.Debugf("updateLastSequence: %s", lastSequence)
- db, err := dataService.DB()
- if err != nil {
- log.Errorf("updateLastSequence: Unable to get DB Err: {%v}", err)
- return err
- }
- tx, err := db.Begin()
+ tx, err := getDB().Begin()
if err != nil {
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
- _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;", lastSequence)
+ _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
rollbackTxn(tx)
@@ -450,7 +457,6 @@
return err
}
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
- log.Infof("Replication lastSequence=%s", lastSequence)
return nil
}
@@ -499,7 +505,7 @@
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
- if err != nil {
+ if err == nil {
err = tx.Commit()
if err != nil {
rollbackTxn(tx)