Make apigeeSync compatible with new apid-core
diff --git a/data.go b/data.go
index a60902c..1fb36b2 100644
--- a/data.go
+++ b/data.go
@@ -79,7 +79,7 @@
 }
 
 //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
 
 	if len(rows) == 0 {
 		return false
@@ -133,7 +133,7 @@
 	return values
 }
 
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	sort.Strings(pkeys)
 	if len(pkeys) == 0 || err != nil {
@@ -200,7 +200,7 @@
 
 }
 
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	if len(pkeys) == 0 || err != nil {
 		log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -456,12 +456,18 @@
 	if err != nil {
 		return
 	}
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return
+	}
 
-	err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+	err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
 		Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
 	if err != nil {
 		if err != sql.ErrNoRows {
 			log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
+			tx.Rollback()
 			return
 		} else {
 			// first start - no row, generate a UUID and store it
@@ -470,7 +476,7 @@
 			info.InstanceID = GenerateUUID()
 
 			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
-			db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+			_, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 				info.InstanceID, info.ClusterID, "")
 		}
 	} else if savedClusterId != info.ClusterID {
@@ -479,10 +485,18 @@
 		newInstanceID = true
 		info.InstanceID = GenerateUUID()
 
-		db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+		_, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 			info.InstanceID, info.ClusterID, "")
 		info.LastSnapshot = ""
 	}
+	if err != nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
+	}
 	return
 }
 
@@ -493,18 +507,32 @@
 	if err != nil {
 		return err
 	}
-
-	rows, err := db.Exec(`
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return err
+	}
+	rows, err := tx.Exec(`
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
 		VALUES (?,?,?)`,
 		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
 	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
+		rollbackTxn(tx)
 		return err
 	}
 	n, err := rows.RowsAffected()
 	if err == nil && n == 0 {
 		err = errors.New("no rows affected")
+		rollbackTxn(tx)
+	} else if err == nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
 	}
 
 	return err
diff --git a/listener.go b/listener.go
index f7b7157..8635a81 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
 package apidApigeeSync
 
 import (
-	"database/sql"
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 )
@@ -59,7 +58,7 @@
 	}
 }
 
-func rollbackTxn (tx *sql.Tx) {
+func rollbackTxn (tx apid.Tx) {
 	err := tx.Rollback()
 	if err != nil {
 		log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)