[XAPID-1087]Make sure all updates are in a Txn.
diff --git a/data.go b/data.go
index a60902c..878038d 100644
--- a/data.go
+++ b/data.go
@@ -79,7 +79,7 @@
 }
 
 //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
 
 	if len(rows) == 0 {
 		return false
@@ -133,7 +133,7 @@
 	return values
 }
 
-func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	sort.Strings(pkeys)
 	if len(pkeys) == 0 || err != nil {
@@ -200,7 +200,7 @@
 
 }
 
-func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
 	pkeys, err := getPkeysForTable(tableName)
 	if len(pkeys) == 0 || err != nil {
 		log.Errorf("UPDATE No primary keys found for table.", tableName)
@@ -426,19 +426,29 @@
 
 	log.Debugf("updateLastSequence: %s", lastSequence)
 
-	stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
+	db, err := dataService.DB()
 	if err != nil {
-		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+		log.Errorf("updateLastSequence: Unable to get DB Err: {%v}", err)
 		return err
 	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec(lastSequence)
+	tx, err := db.Begin()
 	if err != nil {
-		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
 	}
 
+	_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;", lastSequence)
+	if err != nil {
+		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+		rollbackTxn(tx)
+		return err
+	}
+	err = tx.Commit()
+	if err != nil {
+		log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err)
+		rollbackTxn(tx)
+		return err
+	}
 	log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
 	log.Infof("Replication lastSequence=%s", lastSequence)
 	return nil
@@ -456,12 +466,18 @@
 	if err != nil {
 		return
 	}
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return
+	}
 
-	err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
+	err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
 		Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
 	if err != nil {
 		if err != sql.ErrNoRows {
 			log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
+			tx.Rollback()
 			return
 		} else {
 			// first start - no row, generate a UUID and store it
@@ -470,7 +486,7 @@
 			info.InstanceID = GenerateUUID()
 
 			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
-			db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+			_, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 				info.InstanceID, info.ClusterID, "")
 		}
 	} else if savedClusterId != info.ClusterID {
@@ -479,10 +495,18 @@
 		newInstanceID = true
 		info.InstanceID = GenerateUUID()
 
-		db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
+		_, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
 			info.InstanceID, info.ClusterID, "")
 		info.LastSnapshot = ""
 	}
+	if err != nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
+	}
 	return
 }
 
@@ -493,18 +517,32 @@
 	if err != nil {
 		return err
 	}
-
-	rows, err := db.Exec(`
+	tx, err := db.Begin()
+	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+		return err
+	}
+	rows, err := tx.Exec(`
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
 		VALUES (?,?,?)`,
 		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
 	if err != nil {
+		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
+		rollbackTxn(tx)
 		return err
 	}
 	n, err := rows.RowsAffected()
 	if err == nil && n == 0 {
 		err = errors.New("no rows affected")
+		rollbackTxn(tx)
+	} else if err == nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
 	}
 
 	return err
diff --git a/listener.go b/listener.go
index d397171..8635a81 100644
--- a/listener.go
+++ b/listener.go
@@ -58,43 +58,44 @@
 	}
 }
 
+func rollbackTxn (tx apid.Tx) {
+	err := tx.Rollback()
+	if err != nil {
+		log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
+	}
+}
+
 func processSqliteSnapshot(db apid.DB) {
 
 	var numApidClusters int
 	tx, err := db.Begin()
 	if err != nil {
-		log.Panicf("Unable to open DB txn: %v", err)
+		log.Panicf("Unable to open DB txn: {%v}", err.Error())
 	}
 
 	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
 	if err != nil {
-		log.Panicf("Unable to read database: %s", err.Error())
+		log.Panicf("Unable to read database: {%s}", err.Error())
 	}
 
 	if numApidClusters != 1 {
 		log.Panic("Illegal state for apid_cluster. Must be a single row.")
 	}
 
-	prep, err := tx.Prepare("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
 	if err != nil {
 		if err.Error() == "duplicate column name: last_sequence" {
-			tx.Rollback()
+			rollbackTxn(tx)
 			return
 		} else {
-			log.Panicf("Unable to create last_sequence column on DB.  Unrecoverable error ", err)
+			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
 		}
 	}
-
-	_, err = prep.Exec()
+	err = tx.Commit()
 	if err != nil {
-		log.Debugf("Snapshot processing DB exec failed. Err: %v", err)
-		prep.Close()
-		tx.Rollback()
+		rollbackTxn(tx)
 		return
 	}
-	prep.Close()
-	tx.Commit()
-
 }
 
 func processChangeList(changes *common.ChangeList) bool {