reformat
diff --git a/data.go b/data.go
index eca1491..3d33902 100644
--- a/data.go
+++ b/data.go
@@ -71,6 +71,17 @@
 type dbManager struct {
 	db    apid.DB
 	dbMux sync.RWMutex
+	data  apid.DataService
+}
+
+func (dbc *dbManager) setDbVersion(version string) {
+	db, err := dbc.data.DBVersion(version)
+	if err != nil {
+		log.Panicf("Unable to access database: %v", err)
+	}
+	dbc.dbMux.Lock()
+	dbc.db = db
+	dbc.dbMux.Unlock()
 }
 
 func (dbc *dbManager) getDb() apid.DB {
diff --git a/data_test.go b/data_test.go
index e1bf8dc..f6115d4 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,55 +15,67 @@
 package apidApigeeSync
 
 import (
+	"github.com/30x/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"io/ioutil"
+	"reflect"
 	"sort"
+	"strconv"
 	"sync"
 )
 
-var _ = Describe("data access tests", func() {
-	var testCount int
+var _ = Describe("DB manager tests", func() {
 	var testDbMan *dbManager
+	var testCount int
 	BeforeEach(func() {
-		testCount += 1
 		testDbMan = &dbManager{
 			dbMux: sync.RWMutex{},
+			data:  dataService,
 		}
+		testCount += 1
 
-		db := testDbMan.getDb()
-
-		//all tests in this file operate on the api_product table.  Create the necessary tables for this here
-		db.Exec("CREATE TABLE _transicator_tables " +
-			"(tableName varchar not null, columnName varchar not null, " +
-			"typid integer, primaryKey bool);")
-		db.Exec("DELETE from _transicator_tables")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
-		db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
-		db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
-			"api_resources text,approval_type text,scopes text,proxies text, environments text," +
-			"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
-			"primary key (id,tenant_id,created_at,updated_at));")
-		db.Exec("DELETE from kms_api_product")
-
-		setDB(db)
-		initDB(db)
-
+		testDbMan.setDbVersion("data_test_" + strconv.Itoa(testCount))
 	})
 
-	Context("Update processing", func() {
+	var _ = AfterEach(func() {
+		testDbMan = nil
+		data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount)))
+	})
+
+	Context("Basic Update/Insert/Delete processing", func() {
+		BeforeEach(func() {
+			db := testDbMan.getDb()
+
+			//all tests in this file operate on the api_product table.  Create the necessary tables for this here
+			db.Exec("CREATE TABLE _transicator_tables " +
+				"(tableName varchar not null, columnName varchar not null, " +
+				"typid integer, primaryKey bool);")
+			db.Exec("DELETE from _transicator_tables")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+			db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+			db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+				"api_resources text,approval_type text,scopes text,proxies text, environments text," +
+				"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+				"primary key (id,tenant_id,created_at,updated_at));")
+			db.Exec("DELETE from kms_api_product")
+
+			testDbMan.initDefaultDb()
+		})
+
 		It("unit test buildUpdateSql with single primary key", func() {
 			testRow := common.Row{
 				"id": {
@@ -193,9 +205,9 @@
 				},
 			}
 			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -210,13 +222,13 @@
 			}
 
 			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+			err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -288,9 +300,9 @@
 				},
 			}
 			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -305,13 +317,13 @@
 			}
 
 			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+			err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 		})
@@ -390,9 +402,9 @@
 				},
 			}
 			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -407,13 +419,13 @@
 			}
 
 			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+			err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 		})
@@ -489,9 +501,9 @@
 				},
 			}
 			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -506,19 +518,17 @@
 			}
 
 			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+			err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 		})
-	})
 
-	Context("Insert processing", func() {
 		It("Properly constructs insert sql for one row", func() {
 			newRow := common.Row{
 				"id": {
@@ -659,16 +669,16 @@
 				},
 			}
 
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
@@ -743,22 +753,22 @@
 				},
 			}
 
-			Expect(true).To(Equal(processChangeList(event)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(2))
 		})
@@ -801,12 +811,12 @@
 				},
 			}
 
-			ok := processChangeList(event)
+			ok := testDbMan.writeTransaction(event)
 			Expect(false).To(Equal(ok))
 
 			var nRows int
 			//assert that no extraneous rows were added
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(0))
 		})
@@ -880,12 +890,10 @@
 				},
 			}
 
-			ok := processChangeList(event)
+			ok := testDbMan.writeTransaction(event)
 			Expect(false).To(Equal(ok))
 		})
-	})
 
-	Context("Delete processing", func() {
 		It("Properly constructs sql prepare for Delete", func() {
 			row := common.Row{
 				"id": {
@@ -914,7 +922,7 @@
 				},
 			}
 
-			pkeys, err := getPkeysForTable("kms_api_product")
+			pkeys, err := testDbMan.getPkeysForTable("kms_api_product")
 			Expect(err).Should(Succeed())
 			sql := buildDeleteSql("kms_api_product", row, pkeys)
 			Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
@@ -966,28 +974,28 @@
 				},
 			}
 
-			Expect(true).To(Equal(processChangeList(event1)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
 			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
-			Expect(true).To(Equal(processChangeList(event2)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event2)))
 
 			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(0))
 
 			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
+			Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
 		})
 
 		It("verify multiple insert and single delete works", func() {
@@ -1068,43 +1076,43 @@
 				},
 			}
 
-			Expect(true).To(Equal(processChangeList(event1)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
 			var nRows int
 			//verify first row
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//verify second row
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(2))
 
-			Expect(true).To(Equal(processChangeList(event2)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event2)))
 
 			//verify second row still exists
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
+			Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
 		}, 3)
 
 		It("verify single insert and multiple delete fails", func() {
@@ -1185,21 +1193,209 @@
 				},
 			}
 
-			Expect(true).To(Equal(processChangeList(event1)))
+			Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
 			var nRows int
 			//verify insert
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+			err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
 				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
 				"and _change_selector='cs'").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
 			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
 			Expect(err).NotTo(HaveOccurred())
 			Expect(nRows).To(Equal(1))
 
-			Expect(false).To(Equal(processChangeList(event2)))
+			Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
+
+		}, 3)
+	})
+
+	Context("ApigeeSync methods", func() {
+		var createTestDb = func(sqlfile string) {
+
+			db := testDbMan.getDb()
+			sqlStatementsBuffer, err := ioutil.ReadFile(sqlfile)
+			Expect(err).Should(Succeed())
+			sqlStatementsString := string(sqlStatementsBuffer)
+			_, err = db.Exec(sqlStatementsString)
+			Expect(err).Should(Succeed())
+		}
+
+		It("test getClusterCount", func() {
+			createTestDb("./sql/init_listener_test_duplicate_apids.sql")
+			Expect(testDbMan.getClusterCount()).To(Equal(2))
+		})
+
+		It("should process a valid Snapshot", func() {
+
+			createTestDb("./sql/init_listener_test_valid_snapshot.sql")
+
+			dbVersion := "data_test_" + strconv.Itoa(testCount)
+			err := testDbMan.updateApidInstanceInfo()
+			Expect(err).Should(Succeed())
+
+			info, err := testDbMan.getApidInstanceInfo()
+			Expect(err).Should(Succeed())
+			Expect(info.LastSnapshot).To(Equal(dbVersion))
+
+			db := testDbMan.getDb()
+
+			// apid Cluster
+			var dcs []dataApidCluster
+			rows, err := db.Query(`
+			SELECT id, name, description, umbrella_org_app_name,
+				created, created_by, updated, updated_by
+			FROM EDGEX_APID_CLUSTER`)
+			Expect(err).NotTo(HaveOccurred())
+			defer rows.Close()
+
+			c := dataApidCluster{}
+			for rows.Next() {
+				rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
+					&c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
+				dcs = append(dcs, c)
+			}
+
+			Expect(len(dcs)).To(Equal(1))
+			dc := dcs[0]
+
+			Expect(dc.ID).To(Equal("i"))
+			Expect(dc.Name).To(Equal("n"))
+			Expect(dc.Description).To(Equal("d"))
+			Expect(dc.OrgAppName).To(Equal("o"))
+			Expect(dc.Created).To(Equal("c"))
+			Expect(dc.CreatedBy).To(Equal("c"))
+			Expect(dc.Updated).To(Equal("u"))
+			Expect(dc.UpdatedBy).To(Equal("u"))
+
+			// Data Scope
+			var dds []dataDataScope
+
+			rows, err = db.Query(`
+			SELECT id, apid_cluster_id, scope, org,
+				env, created, created_by, updated,
+				updated_by
+			FROM EDGEX_DATA_SCOPE`)
+			Expect(err).NotTo(HaveOccurred())
+			defer rows.Close()
+
+			d := dataDataScope{}
+			for rows.Next() {
+				rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
+					&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
+					&d.UpdatedBy)
+				dds = append(dds, d)
+			}
+
+			Expect(len(dds)).To(Equal(3))
+			ds := dds[0]
+
+			Expect(ds.ID).To(Equal("i"))
+			Expect(ds.Org).To(Equal("o"))
+			Expect(ds.Env).To(Equal("e1"))
+			Expect(ds.Scope).To(Equal("s1"))
+			Expect(ds.Created).To(Equal("c"))
+			Expect(ds.CreatedBy).To(Equal("c"))
+			Expect(ds.Updated).To(Equal("u"))
+			Expect(ds.UpdatedBy).To(Equal("u"))
+
+			ds = dds[1]
+			Expect(ds.Env).To(Equal("e2"))
+			Expect(ds.Scope).To(Equal("s1"))
+			ds = dds[2]
+			Expect(ds.Env).To(Equal("e3"))
+			Expect(ds.Scope).To(Equal("s2"))
+
+			scopes := testDbMan.findScopesForId("a")
+			Expect(len(scopes)).To(Equal(6))
+			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
+			sort.Strings(scopes)
+			sort.Strings(expectedScopes)
+			Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
+		}, 3)
+
+		It("insert event should add", func() {
+			createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
+			event := common.ChangeList{
+				LastSequence: "test",
+				Changes: []common.Change{
+					{
+						Operation: common.Insert,
+						Table:     LISTENER_TABLE_DATA_SCOPE,
+						NewRow: common.Row{
+							"id":               &common.ColumnVal{Value: "i"},
+							"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+							"scope":            &common.ColumnVal{Value: "s1"},
+							"org":              &common.ColumnVal{Value: "o"},
+							"env":              &common.ColumnVal{Value: "e"},
+							"created":          &common.ColumnVal{Value: "c"},
+							"created_by":       &common.ColumnVal{Value: "c"},
+							"updated":          &common.ColumnVal{Value: "u"},
+							"updated_by":       &common.ColumnVal{Value: "u"},
+							"_change_selector": &common.ColumnVal{Value: "cs"},
+						},
+					},
+					{
+						Operation: common.Insert,
+						Table:     LISTENER_TABLE_DATA_SCOPE,
+						NewRow: common.Row{
+							"id":               &common.ColumnVal{Value: "j"},
+							"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+							"scope":            &common.ColumnVal{Value: "s2"},
+							"org":              &common.ColumnVal{Value: "o"},
+							"env":              &common.ColumnVal{Value: "e"},
+							"created":          &common.ColumnVal{Value: "c"},
+							"created_by":       &common.ColumnVal{Value: "c"},
+							"updated":          &common.ColumnVal{Value: "u"},
+							"updated_by":       &common.ColumnVal{Value: "u"},
+							"_change_selector": &common.ColumnVal{Value: "cs"},
+						},
+					},
+				},
+			}
+
+			testDbMan.writeTransaction(&event)
+
+			var dds []dataDataScope
+
+			rows, err := testDbMan.getDb().Query(`
+				SELECT id, apid_cluster_id, scope, org,
+					env, created, created_by, updated,
+					updated_by
+				FROM EDGEX_DATA_SCOPE`)
+			Expect(err).NotTo(HaveOccurred())
+			defer rows.Close()
+
+			d := dataDataScope{}
+			for rows.Next() {
+				rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
+					&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
+					&d.UpdatedBy)
+				dds = append(dds, d)
+			}
+
+			//three already existing
+			Expect(len(dds)).To(Equal(2))
+			ds := dds[0]
+
+			Expect(ds.ID).To(Equal("i"))
+			Expect(ds.Org).To(Equal("o"))
+			Expect(ds.Env).To(Equal("e"))
+			Expect(ds.Scope).To(Equal("s1"))
+			Expect(ds.Created).To(Equal("c"))
+			Expect(ds.CreatedBy).To(Equal("c"))
+			Expect(ds.Updated).To(Equal("u"))
+			Expect(ds.UpdatedBy).To(Equal("u"))
+
+			ds = dds[1]
+			Expect(ds.Scope).To(Equal("s2"))
+
+			scopes := testDbMan.findScopesForId("a")
+			Expect(len(scopes)).To(Equal(2))
+			Expect(scopes[0]).To(Equal("s1"))
+			Expect(scopes[1]).To(Equal("s2"))
 
 		}, 3)
 	})
diff --git a/init.go b/init.go
index d8226de..f456953 100644
--- a/init.go
+++ b/init.go
@@ -105,6 +105,7 @@
 
 	dbMan = &dbManager{
 		dbMux: sync.RWMutex{},
+		data:  services.Data(),
 	}
 
 	// set up default database
diff --git a/listener.go b/listener.go
index 76ec295..cffeb05 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
 package apidApigeeSync
 
 import (
-	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 )
 
@@ -31,26 +30,21 @@
 func (lm *listenerManager) processSnapshot(snapshot *common.Snapshot) {
 	log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
 
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Unable to access database: %v", err)
-	}
-
-	lm.processSqliteSnapshot(db)
+	lm.dbm.setDbVersion(snapshot.SnapshotInfo)
+	lm.processSqliteSnapshot()
 
 	//update apid instance info
 	apidInfo.LastSnapshot = snapshot.SnapshotInfo
-	err = lm.dbm.updateApidInstanceInfo()
+	err := lm.dbm.updateApidInstanceInfo()
 	if err != nil {
 		log.Panicf("Unable to update instance info: %v", err)
 	}
 
-	lm.dbm.setDb(db)
 	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
 
 }
 
-func (lm *listenerManager) processSqliteSnapshot(db apid.DB) {
+func (lm *listenerManager) processSqliteSnapshot() {
 
 	if count, err := lm.dbm.getClusterCount(); err != nil || count != 1 {
 		log.Panicf("Illegal state for apid_cluster. Must be a single row. %v", err)
diff --git a/listener_test.go b/listener_test.go
index d48c70e..2950288 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -20,11 +20,19 @@
 
 	"github.com/apigee-labs/transicator/common"
 	"os"
-	"reflect"
-	"sort"
 )
 
 var _ = Describe("listener", func() {
+	var testListenerMan *listenerManager
+	var mockDbMan *dummyDbMan
+	var testCount int
+	BeforeEach(func() {
+		mockDbMan = &dummyDbMan{}
+		testListenerMan = &listenerManager{
+			dbm: mockDbMan,
+		}
+		testCount += 1
+	})
 
 	var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
 		initDb(sqlfile, "./mockdb.sqlite3")
@@ -40,11 +48,11 @@
 	Context("ApigeeSync snapshot event", func() {
 
 		It("should fail if more than one apid_cluster rows", func() {
-			event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
-			Expect(func() { processSnapshot(&event) }).To(Panic())
+			mockDbMan.clusterCount = 2
+			Expect(func() { testListenerMan.processSnapshot(&common.Snapshot{}) }).To(Panic())
 		}, 3)
 
-		It("should fail if more than one apid_cluster rows", func() {
+		It("test scope change", func() {
 			newScopes := []string{"foo"}
 			scopes := []string{"bar"}
 			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
@@ -59,99 +67,6 @@
 			Expect(scopeChanged(newScopes, scopes)).To(BeNil())
 
 		}, 3)
-
-		It("should process a valid Snapshot", func() {
-
-			event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
-			processSnapshot(&event)
-
-			info, err := getApidInstanceInfo()
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
-
-			db := getDB()
-
-			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(db == expectedDB).Should(BeTrue())
-
-			// apid Cluster
-			var dcs []dataApidCluster
-
-			rows, err := db.Query(`
-			SELECT id, name, description, umbrella_org_app_name,
-				created, created_by, updated, updated_by
-			FROM EDGEX_APID_CLUSTER`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			c := dataApidCluster{}
-			for rows.Next() {
-				rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
-					&c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
-				dcs = append(dcs, c)
-			}
-
-			Expect(len(dcs)).To(Equal(1))
-			dc := dcs[0]
-
-			Expect(dc.ID).To(Equal("i"))
-			Expect(dc.Name).To(Equal("n"))
-			Expect(dc.Description).To(Equal("d"))
-			Expect(dc.OrgAppName).To(Equal("o"))
-			Expect(dc.Created).To(Equal("c"))
-			Expect(dc.CreatedBy).To(Equal("c"))
-			Expect(dc.Updated).To(Equal("u"))
-			Expect(dc.UpdatedBy).To(Equal("u"))
-
-			// Data Scope
-			var dds []dataDataScope
-
-			rows, err = db.Query(`
-			SELECT id, apid_cluster_id, scope, org,
-				env, created, created_by, updated,
-				updated_by
-			FROM EDGEX_DATA_SCOPE`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			d := dataDataScope{}
-			for rows.Next() {
-				rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-					&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-					&d.UpdatedBy)
-				dds = append(dds, d)
-			}
-
-			Expect(len(dds)).To(Equal(3))
-			ds := dds[0]
-
-			Expect(ds.ID).To(Equal("i"))
-			Expect(ds.Org).To(Equal("o"))
-			Expect(ds.Env).To(Equal("e1"))
-			Expect(ds.Scope).To(Equal("s1"))
-			Expect(ds.Created).To(Equal("c"))
-			Expect(ds.CreatedBy).To(Equal("c"))
-			Expect(ds.Updated).To(Equal("u"))
-			Expect(ds.UpdatedBy).To(Equal("u"))
-
-			ds = dds[1]
-			Expect(ds.Env).To(Equal("e2"))
-			Expect(ds.Scope).To(Equal("s1"))
-			ds = dds[2]
-			Expect(ds.Env).To(Equal("e3"))
-			Expect(ds.Scope).To(Equal("s2"))
-
-			scopes := findScopesForId("a")
-			Expect(len(scopes)).To(Equal(6))
-			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
-			sort.Strings(scopes)
-			sort.Strings(expectedScopes)
-			Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
-		}, 3)
 	})
 
 	Context("ApigeeSync change event", func() {
@@ -159,11 +74,6 @@
 		Context(LISTENER_TABLE_APID_CLUSTER, func() {
 
 			It("insert event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
-				processSnapshot(&ssEvent)
-
-				//save the last snapshot, so we can restore it at the end of this context
-
 				csEvent := common.ChangeList{
 					LastSequence: "test",
 					Changes: []common.Change{
@@ -174,13 +84,10 @@
 					},
 				}
 
-				Expect(func() { processChangeList(&csEvent) }).To(Panic())
+				Expect(func() { testListenerMan.processChangeList(&csEvent) }).To(Panic())
 			}, 3)
 
 			It("update event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
-				processSnapshot(&ssEvent)
-
 				event := common.ChangeList{
 					LastSequence: "test",
 					Changes: []common.Change{
@@ -191,7 +98,7 @@
 					},
 				}
 
-				Expect(func() { processChangeList(&event) }).To(Panic())
+				Expect(func() { testListenerMan.processChangeList(&event) }).To(Panic())
 				//restore the last snapshot
 			}, 3)
 
@@ -199,91 +106,6 @@
 
 		Context(LISTENER_TABLE_DATA_SCOPE, func() {
 
-			It("insert event should add", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "i"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s1"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "j"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s2"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-					},
-				}
-
-				processChangeList(&event)
-
-				var dds []dataDataScope
-
-				rows, err := getDB().Query(`
-				SELECT id, apid_cluster_id, scope, org,
-					env, created, created_by, updated,
-					updated_by
-				FROM EDGEX_DATA_SCOPE`)
-				Expect(err).NotTo(HaveOccurred())
-				defer rows.Close()
-
-				d := dataDataScope{}
-				for rows.Next() {
-					rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-						&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-						&d.UpdatedBy)
-					dds = append(dds, d)
-				}
-
-				//three already existing
-				Expect(len(dds)).To(Equal(2))
-				ds := dds[0]
-
-				Expect(ds.ID).To(Equal("i"))
-				Expect(ds.Org).To(Equal("o"))
-				Expect(ds.Env).To(Equal("e"))
-				Expect(ds.Scope).To(Equal("s1"))
-				Expect(ds.Created).To(Equal("c"))
-				Expect(ds.CreatedBy).To(Equal("c"))
-				Expect(ds.Updated).To(Equal("u"))
-				Expect(ds.UpdatedBy).To(Equal("u"))
-
-				ds = dds[1]
-				Expect(ds.Scope).To(Equal("s2"))
-
-				scopes := findScopesForId("a")
-				Expect(len(scopes)).To(Equal(2))
-				Expect(scopes[0]).To(Equal("s1"))
-				Expect(scopes[1]).To(Equal("s2"))
-
-			}, 3)
-
 			It("delete event should delete", func() {
 				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
 				processSnapshot(&ssEvent)
diff --git a/managerInterfaces.go b/managerInterfaces.go
index e406784..ff29c2d 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -61,10 +61,11 @@
 	getClusterCount() (numApidClusters int, err error)
 	alterClusterTable() (err error)
 	writeTransaction(*common.ChangeList) bool
+	setDbVersion(version string)
 }
 
 type listenerManagerInterface interface {
 	processSnapshot(snapshot *common.Snapshot)
-	processSqliteSnapshot(db apid.DB)
+	processSqliteSnapshot()
 	processChangeList(changes *common.ChangeList) bool
 }
diff --git a/token_test.go b/token_test.go
index e75b54d..cb38125 100644
--- a/token_test.go
+++ b/token_test.go
@@ -216,10 +216,13 @@
 	})
 })
 
-type dummyDbMan struct{}
+type dummyDbMan struct {
+	clusterCount int
+	err          error
+}
 
 func (d *dummyDbMan) initDefaultDb() error {
-	return nil
+	return d.err
 }
 
 func (d *dummyDbMan) setDb(apid.DB) {}
@@ -241,7 +244,7 @@
 }
 
 func (d *dummyDbMan) getPkeysForTable(tableName string) ([]string, error) {
-	return nil, nil
+	return nil, d.err
 }
 
 func (d *dummyDbMan) findScopesForId(configId string) []string {
@@ -249,15 +252,15 @@
 }
 
 func (d *dummyDbMan) getDefaultDb() (apid.DB, error) {
-	return nil, nil
+	return nil, d.err
 }
 
 func (d *dummyDbMan) updateApidInstanceInfo() error {
-	return nil
+	return d.err
 }
 
-func (d *dummyDbMan) getApidInstanceInfo() (info apidInstanceInfo, err error) {
-	return
+func (d *dummyDbMan) getApidInstanceInfo() (apidInstanceInfo, error) {
+	return apidInstanceInfo{}, d.err
 }
 
 func (d *dummyDbMan) getLastSequence() string {
@@ -265,13 +268,20 @@
 }
 
 func (d *dummyDbMan) updateLastSequence(lastSequence string) error {
-	return nil
+	return d.err
 }
 
 func (d *dummyDbMan) getClusterCount() (int, error) {
-	return 1, nil
+	return d.clusterCount, d.err
 }
 
 func (d *dummyDbMan) alterClusterTable() error {
-	return nil
+	return d.err
+}
+func (d *dummyDbMan) writeTransaction(_ *common.ChangeList) bool {
+	return true
+}
+
+func (d *dummyDbMan) setDbVersion(version string) {
+
 }