Revert "Use defer to complete/rollback txn."

This reverts commit b8a7560673f5f6bc0e7013903dba263f327da6f6.
diff --git a/data.go b/data.go
index c0f7a31..fd4111c 100644
--- a/data.go
+++ b/data.go
@@ -54,9 +54,6 @@
 		log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
 		return err
 	}
-
-	defer completeTxn(tx, err)
-
 	_, err = tx.Exec(`
 	CREATE TABLE IF NOT EXISTS APID (
 	    instance_id text,
@@ -66,9 +63,16 @@
 	);
 	`)
 	if err != nil {
+		rollbackTxn(tx)
 		log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
 		return err
 	}
+	err = tx.Commit()
+	if err != nil {
+		rollbackTxn(tx)
+		log.Errorf("initDB(): tx commit err: {%v}", err)
+		return err
+	}
 	log.Debug("Database tables created.")
 	return nil
 }
@@ -173,7 +177,6 @@
 		if err == nil && affected != 0 {
 			log.Debugf("DELETE Success [%s] values=%v", sql, values)
 		} else if err == nil && affected == 0 {
-			err = errors.New("Entry not found. Nothing to delete")
 			log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
 			return false
 		} else {
@@ -441,11 +444,16 @@
 		return err
 	}
 
-	defer completeTxn(tx, err)
-
 	_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
 	if err != nil {
 		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
+		rollbackTxn(tx)
+		return err
+	}
+	err = tx.Commit()
+	if err != nil {
+		log.Errorf("UPDATE EDGEX_APID_CLUSTER Tx Commit err : %v", err)
+		rollbackTxn(tx)
 		return err
 	}
 	log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
@@ -469,13 +477,13 @@
 		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return
 	}
-	defer completeTxn(tx, err)
 
 	err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
 		Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
 	if err != nil {
 		if err != sql.ErrNoRows {
 			log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
+			tx.Rollback()
 			return
 		} else {
 			// first start - no row, generate a UUID and store it
@@ -497,6 +505,14 @@
 			info.InstanceID, info.ClusterID, "")
 		info.LastSnapshot = ""
 	}
+	if err == nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
+	}
 	return
 }
 
@@ -512,9 +528,6 @@
 		log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
 	}
-
-	defer completeTxn(tx, err)
-
 	rows, err := tx.Exec(`
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
@@ -522,12 +535,20 @@
 		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
 	if err != nil {
 		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
+		rollbackTxn(tx)
 		return err
 	}
-
 	n, err := rows.RowsAffected()
 	if err == nil && n == 0 {
 		err = errors.New("no rows affected")
+		rollbackTxn(tx)
+	} else if err == nil {
+		err = tx.Commit()
+		if err != nil {
+			rollbackTxn(tx)
+		}
+	} else {
+		rollbackTxn(tx)
 	}
 
 	return err
diff --git a/listener.go b/listener.go
index 265ab67..8635a81 100644
--- a/listener.go
+++ b/listener.go
@@ -58,16 +58,8 @@
 	}
 }
 
-func completeTxn (tx apid.Tx, err error) {
-	if err == nil {
-		err = tx.Commit()
-		if err == nil {
-			log.Debugf("Transaction committed successfully")
-			return
-		}
-		log.Errorf("Transaction commit failed with error : {%v}", err)
-	}
-	err = tx.Rollback()
+func rollbackTxn (tx apid.Tx) {
+	err := tx.Rollback()
 	if err != nil {
 		log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
 	}
@@ -86,8 +78,6 @@
 		log.Panicf("Unable to read database: {%s}", err.Error())
 	}
 
-	defer completeTxn(tx, err)
-
 	if numApidClusters != 1 {
 		log.Panic("Illegal state for apid_cluster. Must be a single row.")
 	}
@@ -95,11 +85,17 @@
 	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
 	if err != nil {
 		if err.Error() == "duplicate column name: last_sequence" {
+			rollbackTxn(tx)
 			return
 		} else {
 			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
 		}
 	}
+	err = tx.Commit()
+	if err != nil {
+		rollbackTxn(tx)
+		return
+	}
 }
 
 func processChangeList(changes *common.ChangeList) bool {
@@ -111,7 +107,7 @@
 		log.Panicf("Error processing ChangeList: %v", err)
 		return ok
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 
 	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
 
@@ -136,5 +132,11 @@
 		}
 	}
 
+	err = tx.Commit()
+	if err != nil {
+		log.Panicf("Error processing ChangeList: %v", err)
+		return false
+	}
+
 	return ok
 }