[XAPID-1087]Make sure all updates are in a Txn.
diff --git a/data.go b/data.go index a60902c..878038d 100644 --- a/data.go +++ b/data.go
@@ -79,7 +79,7 @@ } //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn -func insert(tableName string, rows []common.Row, txn *sql.Tx) bool { +func insert(tableName string, rows []common.Row, txn apid.Tx) bool { if len(rows) == 0 { return false @@ -133,7 +133,7 @@ return values } -func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool { +func _delete(tableName string, rows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) sort.Strings(pkeys) if len(pkeys) == 0 || err != nil { @@ -200,7 +200,7 @@ } -func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool { +func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) if len(pkeys) == 0 || err != nil { log.Errorf("UPDATE No primary keys found for table.", tableName) @@ -426,19 +426,29 @@ log.Debugf("updateLastSequence: %s", lastSequence) - stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;") + db, err := dataService.DB() if err != nil { - log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) + log.Errorf("updateLastSequence: Unable to get DB Err: {%v}", err) return err } - defer stmt.Close() - - _, err = stmt.Exec(lastSequence) + tx, err := db.Begin() if err != nil { - log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) return err } + _, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;", lastSequence) + if err != nil { + log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) + rollbackTxn(tx) + return err + } + err = tx.Commit() + if err != nil { + log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err) + rollbackTxn(tx) + return err + } log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) log.Infof("Replication lastSequence=%s", lastSequence) return nil @@ -456,12 +466,18 @@ if err != nil { return } + tx, err := db.Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return + } - err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). + err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) if err != nil { if err != sql.ErrNoRows { log.Errorf("Unable to retrieve apidInstanceInfo: %v", err) + tx.Rollback() return } else { // first start - no row, generate a UUID and store it @@ -470,7 +486,7 @@ info.InstanceID = GenerateUUID() log.Debugf("Inserting new apid instance id %s", info.InstanceID) - db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") } } else if savedClusterId != info.ClusterID { @@ -479,10 +495,18 @@ newInstanceID = true info.InstanceID = GenerateUUID() - db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } + if err != nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) + } return } @@ -493,18 +517,32 @@ if err != nil { return err } - - rows, err := db.Exec(` + tx, err := db.Begin() + if err != nil { + log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return err + } + rows, err := tx.Exec(` REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)`, apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) if err != nil { + log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err) + rollbackTxn(tx) return err } n, err := rows.RowsAffected() if err == nil && n == 0 { err = errors.New("no rows affected") + rollbackTxn(tx) + } else if err == nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) } return err
diff --git a/listener.go b/listener.go index d397171..8635a81 100644 --- a/listener.go +++ b/listener.go
@@ -58,43 +58,44 @@ } } +func rollbackTxn (tx apid.Tx) { + err := tx.Rollback() + if err != nil { + log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err) + } +} + func processSqliteSnapshot(db apid.DB) { var numApidClusters int tx, err := db.Begin() if err != nil { - log.Panicf("Unable to open DB txn: %v", err) + log.Panicf("Unable to open DB txn: {%v}", err.Error()) } err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters) if err != nil { - log.Panicf("Unable to read database: %s", err.Error()) + log.Panicf("Unable to read database: {%s}", err.Error()) } if numApidClusters != 1 { log.Panic("Illegal state for apid_cluster. Must be a single row.") } - prep, err := tx.Prepare("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") + _, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") if err != nil { if err.Error() == "duplicate column name: last_sequence" { - tx.Rollback() + rollbackTxn(tx) return } else { - log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err) + log.Panicf("Unable to create last_sequence column on DB. Error {%v}", err.Error()) } } - - _, err = prep.Exec() + err = tx.Commit() if err != nil { - log.Debugf("Snapshot processing DB exec failed. Err: %v", err) - prep.Close() - tx.Rollback() + rollbackTxn(tx) return } - prep.Close() - tx.Commit() - } func processChangeList(changes *common.ChangeList) bool {