Create Tables in a Txn.
diff --git a/listener.go b/listener.go index 9c002f2..9693aa3 100644 --- a/listener.go +++ b/listener.go
@@ -49,12 +49,12 @@ func processSqliteSnapshot(db apid.DB) { var numApidClusters int - apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster") + tx, err := db.Begin() if err != nil { - log.Panicf("Unable to read database: %s", err.Error()) + log.Panicf("Unable to open DB txn: %v", err) } - apidClusters.Next() - err = apidClusters.Scan(&numApidClusters) + + err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) if err != nil { log.Panicf("Unable to read database: %s", err.Error()) } @@ -63,15 +63,26 @@ log.Panic("Illegal state for apid_cluster. Must be a single row.") } - _, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") + prep, err := tx.Prepare("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") if err != nil { if err.Error() == "duplicate column name: last_sequence" { + tx.Rollback() return } else { - log.Error("[[" + err.Error() + "]]") log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err) } } + + _, err = prep.Exec() + if err != nil { + log.Debugf("Snapshot processing DB exec failed. Err: %v", err) + prep.Close() + tx.Rollback() + return + } + prep.Close() + tx.Commit() + } func processChangeList(changes *common.ChangeList) bool {