fix defer rollback transaction issues.
diff --git a/data.go b/data.go
index 4376a76..3f50df4 100644
--- a/data.go
+++ b/data.go
@@ -54,7 +54,7 @@
 		log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
 		return err
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 	_, err = tx.Exec(`
 	CREATE TABLE IF NOT EXISTS APID (
 	    instance_id text,
@@ -67,6 +67,10 @@
 		log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
 		return err
 	}
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Error when initDb: %v", err)
+		return err
+	}
 	log.Debug("Database tables created.")
 	return nil
 }
@@ -437,14 +441,17 @@
 		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 	_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
 	if err != nil {
 		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
 		return err
 	}
 	log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
-	return nil
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Commit error in updateLastSequence: %v", err)
+	}
+	return err
 }
 
 func getApidInstanceInfo() (info apidInstanceInfo, err error) {
@@ -464,7 +471,7 @@
 		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 	err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
 		Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
 	if err != nil {
@@ -491,6 +498,9 @@
 			info.InstanceID, info.ClusterID, "")
 		info.LastSnapshot = ""
 	}
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Commit error in getApidInstanceInfo: %v", err)
+	}
 	return
 }
 
@@ -506,7 +516,7 @@
 		log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 	rows, err := tx.Exec(`
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
@@ -520,6 +530,9 @@
 	if err == nil && n == 0 {
 		err = errors.New("no rows affected")
 	}
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Commit error in updateApidInstanceInfo: %v", err)
+	}
 
 	return err
 }
diff --git a/data_test.go b/data_test.go
index 53b7509..691e887 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,13 +15,13 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 	"sort"
 	"strconv"
-	"github.com/apid/apid-core"
 )
 
 var _ = Describe("data access tests", func() {
diff --git a/listener.go b/listener.go
index e9f24ad..94befd6 100644
--- a/listener.go
+++ b/listener.go
@@ -30,7 +30,7 @@
 	var prevDb string
 	if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
 		log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
-			apidInfo.LastSnapshot , snapshot.SnapshotInfo)
+			apidInfo.LastSnapshot, snapshot.SnapshotInfo)
 		prevDb = apidInfo.LastSnapshot
 	} else {
 		log.Debugf("Process snapshot for version {%s}",
@@ -59,22 +59,6 @@
 	}
 }
 
-
-func completeTxn (tx apid.Tx, err error) {
-	if err == nil {
-		err = tx.Commit()
-		if err == nil {
-			log.Debugf("Transaction committed successfully")
-			return
-		}
-		log.Errorf("Transaction commit failed with error : {%v}", err)
-	}
-	err = tx.Rollback()
-	if err != nil {
-		log.Panicf("Unable to rollback Transaction. DB in inconsistent state. Err {%v}", err)
-	}
-}
-
 func processSqliteSnapshot(db apid.DB) {
 
 	var numApidClusters int
@@ -82,7 +66,7 @@
 	if err != nil {
 		log.Panicf("Unable to open DB txn: {%v}", err.Error())
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
 	if err != nil {
 		log.Panicf("Unable to read database: {%s}", err.Error())
@@ -100,6 +84,9 @@
 			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
 		}
 	}
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Error when commit in processSqliteSnapshot: %v", err)
+	}
 }
 
 func processChangeList(changes *common.ChangeList) bool {
@@ -109,9 +96,8 @@
 	tx, err := getDB().Begin()
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
-		return ok
 	}
-	defer completeTxn(tx, err)
+	defer tx.Rollback()
 
 	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
 
@@ -133,9 +119,13 @@
 		if !ok {
 			err = errors.New("Sql Operation error. Operation rollbacked")
 			log.Error("Sql Operation error. Operation rollbacked")
-			return ok
+			return false
 		}
 	}
 
-	return ok
+	if err = tx.Commit(); err != nil {
+		log.Errorf("Commit error in processChangeList: %v", err)
+		return false
+	}
+	return true
 }