Make apigeeSync compatible with new apid-core
diff --git a/data.go b/data.go index a60902c..1fb36b2 100644 --- a/data.go +++ b/data.go
@@ -79,7 +79,7 @@ } //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn -func insert(tableName string, rows []common.Row, txn *sql.Tx) bool { +func insert(tableName string, rows []common.Row, txn apid.Tx) bool { if len(rows) == 0 { return false @@ -133,7 +133,7 @@ return values } -func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool { +func _delete(tableName string, rows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) sort.Strings(pkeys) if len(pkeys) == 0 || err != nil { @@ -200,7 +200,7 @@ } -func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool { +func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool { pkeys, err := getPkeysForTable(tableName) if len(pkeys) == 0 || err != nil { log.Errorf("UPDATE No primary keys found for table.", tableName) @@ -456,12 +456,18 @@ if err != nil { return } + tx, err := db.Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return + } - err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). + err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) if err != nil { if err != sql.ErrNoRows { log.Errorf("Unable to retrieve apidInstanceInfo: %v", err) + tx.Rollback() return } else { // first start - no row, generate a UUID and store it @@ -470,7 +476,7 @@ info.InstanceID = GenerateUUID() log.Debugf("Inserting new apid instance id %s", info.InstanceID) - db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") } } else if savedClusterId != info.ClusterID { @@ -479,10 +485,18 @@ newInstanceID = true info.InstanceID = GenerateUUID() - db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", + _, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", info.InstanceID, info.ClusterID, "") info.LastSnapshot = "" } + if err != nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) + } return } @@ -493,18 +507,32 @@ if err != nil { return err } - - rows, err := db.Exec(` + tx, err := db.Begin() + if err != nil { + log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return err + } + rows, err := tx.Exec(` REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)`, apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) if err != nil { + log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err) + rollbackTxn(tx) return err } n, err := rows.RowsAffected() if err == nil && n == 0 { err = errors.New("no rows affected") + rollbackTxn(tx) + } else if err == nil { + err = tx.Commit() + if err != nil { + rollbackTxn(tx) + } + } else { + rollbackTxn(tx) } return err
diff --git a/listener.go b/listener.go index f7b7157..8635a81 100644 --- a/listener.go +++ b/listener.go
@@ -15,7 +15,6 @@ package apidApigeeSync import ( - "database/sql" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) @@ -59,7 +58,7 @@ } } -func rollbackTxn (tx *sql.Tx) { +func rollbackTxn (tx apid.Tx) { err := tx.Rollback() if err != nil { log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)