Merge pull request #61 from 30x/APIRT-4751-newapidCore

Apirt 4751 newapid core
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 3d897d6..bd85b97 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -58,8 +58,12 @@
 			if wipeDBAferTest {
 				db, err := dataService.DB()
 				Expect(err).NotTo(HaveOccurred())
-				_, err = db.Exec("DELETE FROM APID")
+				tx, err := db.Begin()
+				_, err = tx.Exec("DELETE FROM APID")
 				Expect(err).NotTo(HaveOccurred())
+				err = tx.Commit()
+				Expect(err).NotTo(HaveOccurred())
+
 			}
 			wipeDBAferTest = true
 			newInstanceID = true
@@ -100,7 +104,10 @@
 			if wipeDBAferTest {
 				db, err := dataService.DB()
 				Expect(err).NotTo(HaveOccurred())
-				_, err = db.Exec("DELETE FROM APID")
+				tx, err := db.Begin()
+				_, err = tx.Exec("DELETE FROM APID")
+				Expect(err).NotTo(HaveOccurred())
+				err = tx.Commit()
 				Expect(err).NotTo(HaveOccurred())
 			}
 			wipeDBAferTest = true
@@ -181,16 +188,22 @@
 					Expect(true).To(Equal(numApidClusters.Next()))
 					numApidClusters.Scan(&rowCount)
 					Expect(1).To(Equal(rowCount))
-					apidClusters, _ := db.Query("select id from edgex_apid_cluster;")
+					numApidClusters.Close()
+					apidClusters, err := db.Query("select id from edgex_apid_cluster;")
+					Expect(err).NotTo(HaveOccurred())
 					apidClusters.Next()
 					apidClusters.Scan(&id)
 					Expect(id).To(Equal(expectedClusterId))
+					apidClusters.Close()
 
-					numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;")
+					numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;")
+					Expect(err).NotTo(HaveOccurred())
 					Expect(true).To(Equal(numDataScopes.Next()))
 					numDataScopes.Scan(&rowCount)
 					Expect(2).To(Equal(rowCount))
-					dataScopes, _ := db.Query("select id from edgex_data_scope;")
+					numDataScopes.Close()
+					dataScopes, err := db.Query("select id from edgex_data_scope;")
+					Expect(err).NotTo(HaveOccurred())
 					dataScopes.Next()
 					dataScopes.Scan(&id)
 					dataScopes.Next()
@@ -202,6 +215,7 @@
 						dataScopes.Scan(&id)
 						Expect(id).To(Equal(expectedDataScopeId1))
 					}
+					dataScopes.Close()
 
 				} else if cl, ok := event.(*common.ChangeList); ok {
 					closeDone = apidChangeManager.close()
diff --git a/change_test.go b/change_test.go
index d7ee964..e7eed6f 100644
--- a/change_test.go
+++ b/change_test.go
@@ -82,7 +82,10 @@
 			if wipeDBAferTest {
 				db, err := dataService.DB()
 				Expect(err).Should(Succeed())
-				_, err = db.Exec("DELETE FROM APID")
+				tx, err := db.Begin()
+				_, err = tx.Exec("DELETE FROM APID")
+				Expect(err).Should(Succeed())
+				err = tx.Commit()
 				Expect(err).Should(Succeed())
 			}
 			wipeDBAferTest = true
diff --git a/changes.go b/changes.go
index 6358307..20e7f6d 100644
--- a/changes.go
+++ b/changes.go
@@ -336,7 +336,7 @@
 	lastSequence = seq
 	err := updateLastSequence(seq)
 	if err != nil {
-		log.Panic("Unable to update Sequence in DB")
+		log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
 	}
 
 }
diff --git a/data.go b/data.go
index a60902c..3c6994b 100644
--- a/data.go
+++ b/data.go
@@ -49,7 +49,13 @@
 (Currently, the snapshot never changes, but this is future-proof)
 */
 func initDB(db apid.DB) error {
-	_, err := db.Exec(`
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
+		return err
+	}
+	defer completeTxn(tx, err)
+	_, err = tx.Exec(`
 	CREATE TABLE IF NOT EXISTS APID (
 	    instance_id text,
 	    apid_cluster_id text,
@@ -58,9 +64,9 @@
 	);
 	`)
 	if err != nil {
+		log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
 		return err
 	}
-
 	log.Debug("Database tables created.")
 	return nil
 }
@@ -79,7 +85,7 @@
 }
 
 //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
 
 	if len(rows) == 0 {
 		return false
@@ -133,7 +139,7 @@
 	return values
 }
 
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	sort.Strings(pkeys)
 	if len(pkeys) == 0 || err != nil {
@@ -200,7 +206,7 @@
 
 }
 
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	if len(pkeys) == 0 || err != nil {
 		log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -426,21 +432,18 @@
 
 	log.Debugf("updateLastSequence: %s", lastSequence)
 
-	stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
+	tx, err := getDB().Begin()
+	if err != nil {
+		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return err
+	}
+	defer completeTxn(tx, err)
+	_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
 	if err != nil {
 		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
 		return err
 	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec(lastSequence)
-	if err != nil {
-		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
-		return err
-	}
-
 	log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
-	log.Infof("Replication lastSequence=%s", lastSequence)
 	return nil
 }
 
@@ -456,8 +459,13 @@
 	if err != nil {
 		return
 	}
-
-	err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return
+	}
+	defer completeTxn(tx, err)
+	err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
 		Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
 	if err != nil {
 		if err != sql.ErrNoRows {
@@ -470,7 +478,7 @@
 			info.InstanceID = GenerateUUID()
 
 			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
-			db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+			_, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 				info.InstanceID, info.ClusterID, "")
 		}
 	} else if savedClusterId != info.ClusterID {
@@ -479,7 +487,7 @@
 		newInstanceID = true
 		info.InstanceID = GenerateUUID()
 
-		db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+		_, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 			info.InstanceID, info.ClusterID, "")
 		info.LastSnapshot = ""
 	}
@@ -493,13 +501,19 @@
 	if err != nil {
 		return err
 	}
-
-	rows, err := db.Exec(`
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return err
+	}
+	defer completeTxn(tx, err)
+	rows, err := tx.Exec(`
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
 		VALUES (?,?,?)`,
 		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
 	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
 		return err
 	}
 	n, err := rows.RowsAffected()
diff --git a/data_test.go b/data_test.go
index 438a1d4..0173671 100644
--- a/data_test.go
+++ b/data_test.go
@@ -21,6 +21,7 @@
 	. "github.com/onsi/gomega"
 	"sort"
 	"strconv"
+	"github.com/30x/apid-core"
 )
 
 var _ = Describe("data access tests", func() {
@@ -31,31 +32,7 @@
 		db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount))
 		Expect(err).Should(Succeed())
 		initDB(db)
-		//all tests in this file operate on the api_product table.  Create the necessary tables for this here
-		db.Exec("CREATE TABLE _transicator_tables " +
-			"(tableName varchar not null, columnName varchar not null, " +
-			"typid integer, primaryKey bool);")
-		db.Exec("DELETE from _transicator_tables")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
-		db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
-			"api_resources text,approval_type text,scopes text,proxies text, environments text," +
-			"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
-			"primary key (id,tenant_id,created_at,updated_at));")
-		db.Exec("DELETE from kms_api_product")
-
+		createBootstrapTables(db)
 		setDB(db)
 	})
 
@@ -1204,3 +1181,34 @@
 		}, 3)
 	})
 })
+
+func createBootstrapTables(db apid.DB) {
+	tx, err := db.Begin()
+	Expect(err).To(Succeed())
+	//all tests in this file operate on the api_product table.  Create the necessary tables for this here
+	tx.Exec("CREATE TABLE _transicator_tables " +
+		"(tableName varchar not null, columnName varchar not null, " +
+		"typid integer, primaryKey bool);")
+	tx.Exec("DELETE from _transicator_tables")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+	tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+		"api_resources text,approval_type text,scopes text,proxies text, environments text," +
+		"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+		"primary key (id,tenant_id,created_at,updated_at));")
+	tx.Exec("DELETE from kms_api_product")
+	err = tx.Commit()
+	Expect(err).To(Succeed())
+}
diff --git a/listener.go b/listener.go
index f7b7157..55492c3 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,7 @@
 package apidApigeeSync
 
 import (
-	"database/sql"
+	"errors"
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 )
@@ -59,8 +59,17 @@
 	}
 }
 
-func rollbackTxn (tx *sql.Tx) {
-	err := tx.Rollback()
+
+func completeTxn (tx apid.Tx, err error) {
+	if err == nil {
+		err = tx.Commit()
+		if err == nil {
+			log.Debugf("Transaction committed successfully")
+			return
+		}
+		log.Errorf("Transaction commit failed with error : {%v}", err)
+	}
+	err = tx.Rollback()
 	if err != nil {
 		log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
 	}
@@ -73,7 +82,7 @@
 	if err != nil {
 		log.Panicf("Unable to open DB txn: {%v}", err.Error())
 	}
-
+	defer completeTxn(tx, err)
 	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
 	if err != nil {
 		log.Panicf("Unable to read database: {%s}", err.Error())
@@ -86,17 +95,11 @@
 	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
 	if err != nil {
 		if err.Error() == "duplicate column name: last_sequence" {
-			rollbackTxn(tx)
 			return
 		} else {
 			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
 		}
 	}
-	err = tx.Commit()
-	if err != nil {
-		rollbackTxn(tx)
-		return
-	}
 }
 
 func processChangeList(changes *common.ChangeList) bool {
@@ -108,7 +111,7 @@
 		log.Panicf("Error processing ChangeList: %v", err)
 		return ok
 	}
-	defer tx.Rollback()
+	defer completeTxn(tx, err)
 
 	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
 
@@ -128,16 +131,11 @@
 			ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
 		}
 		if !ok {
+			err = errors.New("Sql Operation error. Operation rollbacked")
 			log.Error("Sql Operation error. Operation rollbacked")
 			return ok
 		}
 	}
 
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-		return false
-	}
-
 	return ok
 }
diff --git a/listener_test.go b/listener_test.go
index 13ea0bf..55df0a5 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -46,7 +46,11 @@
 		if wipeDBAferTest {
 			db, err := dataService.DB()
 			Expect(err).Should(Succeed())
-			_, err = db.Exec("DELETE FROM APID")
+			tx, err := db.Begin()
+			Expect(err).Should(Succeed())
+			_, err = tx.Exec("DELETE FROM APID")
+			Expect(err).Should(Succeed())
+			err = tx.Commit()
 			Expect(err).Should(Succeed())
 		}
 		wipeDBAferTest = true