Fix DB bug, and make sure DB INS/DEL/UP occur via txn
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 3d897d6..6dfb0d4 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -181,10 +181,12 @@ Expect(true).To(Equal(numApidClusters.Next())) numApidClusters.Scan(&rowCount) Expect(1).To(Equal(rowCount)) + numApidClusters.Close() apidClusters, _ := db.Query("select id from edgex_apid_cluster;") apidClusters.Next() apidClusters.Scan(&id) Expect(id).To(Equal(expectedClusterId)) + apidClusters.Close() numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;") Expect(true).To(Equal(numDataScopes.Next()))
diff --git a/changes.go b/changes.go index 6358307..20e7f6d 100644 --- a/changes.go +++ b/changes.go
@@ -336,7 +336,7 @@ lastSequence = seq err := updateLastSequence(seq) if err != nil { - log.Panic("Unable to update Sequence in DB") + log.Panicf("Unable to update Sequence in DB. Err {%v}", err) } }
diff --git a/data.go b/data.go index 878038d..fd4111c 100644 --- a/data.go +++ b/data.go
@@ -49,7 +49,12 @@ (Currently, the snapshot never changes, but this is future-proof) */ func initDB(db apid.DB) error { - _, err := db.Exec(` + tx, err := db.Begin() + if err != nil { + log.Errorf("initDB(): Unable to get DB tx err: {%v}", err) + return err + } + _, err = tx.Exec(` CREATE TABLE IF NOT EXISTS APID ( instance_id text, apid_cluster_id text, @@ -58,9 +63,16 @@ ); `) if err != nil { + rollbackTxn(tx) + log.Errorf("initDB(): Unable to tx exec err: {%v}", err) return err } - + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + log.Errorf("initDB(): tx commit err: {%v}", err) + return err + } log.Debug("Database tables created.") return nil } @@ -426,18 +438,13 @@ log.Debugf("updateLastSequence: %s", lastSequence) - db, err := dataService.DB() - if err != nil { - log.Errorf("updateLastSequence: Unable to get DB Err: {%v}", err) - return err - } - tx, err := db.Begin() + tx, err := getDB().Begin() if err != nil { log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err } - _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;", lastSequence) + _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence) if err != nil { log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) rollbackTxn(tx) @@ -450,7 +457,6 @@ return err } log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) - log.Infof("Replication lastSequence=%s", lastSequence) return nil } @@ -499,7 +505,7 @@ info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } - if err != nil { + if err == nil { err = tx.Commit() if err != nil { rollbackTxn(tx)