Merge pull request #45 from 30x/genericsqlite

Genericsqlite
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 9e6ed94..dd6cba4 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -24,6 +24,7 @@
 )
 
 const dummyConfigValue string = "placeholder"
+const expectedClusterId = "bootstrap"
 
 var _ = BeforeSuite(func() {
 	wipeDBAferTest = true
@@ -42,11 +43,11 @@
 	config.Set(configProxyServerBaseURI, dummyConfigValue)
 	config.Set(configSnapServerBaseURI, dummyConfigValue)
 	config.Set(configChangeServerBaseURI, dummyConfigValue)
-	config.Set(configSnapshotProtocol, "json")
+	config.Set(configSnapshotProtocol, "sqlite")
 	config.Set(configPollInterval, 10*time.Millisecond)
 
 	config.Set(configName, "testhost")
-	config.Set(configApidClusterId, "bootstrap")
+	config.Set(configApidClusterId, expectedClusterId)
 	config.Set(configConsumerKey, "XXXXXXX")
 	config.Set(configConsumerSecret, "YYYYYYY")
 
@@ -54,6 +55,7 @@
 	log = apid.Log()
 
 	_initPlugin(apid.AllServices())
+	createManagers()
 	close(done)
 }, 3)
 
@@ -63,11 +65,6 @@
 	lastSequence = ""
 
 	if wipeDBAferTest {
-		_, err := getDB().Exec("DELETE FROM APID_CLUSTER")
-		Expect(err).NotTo(HaveOccurred())
-		_, err = getDB().Exec("DELETE FROM DATA_SCOPE")
-		Expect(err).NotTo(HaveOccurred())
-
 		db, err := dataService.DB()
 		Expect(err).NotTo(HaveOccurred())
 		_, err = db.Exec("DELETE FROM APID")
diff --git a/apigee_sync.go b/apigee_sync.go
index 391355b..6fc1389 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -27,17 +27,17 @@
 		snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
 
 		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
-			changeManager.pollChangeWithBackoff()
+			apidChangeManager.pollChangeWithBackoff()
 		})
 
 		log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
 		return
 	}
 
-	snapManager.downloadBootSnapshot()
-	snapManager.downloadDataSnapshot()
+	apidSnapshotManager.downloadBootSnapshot()
+	apidSnapshotManager.downloadDataSnapshot()
 
-	changeManager.pollChangeWithBackoff()
+	apidChangeManager.pollChangeWithBackoff()
 
 }
 
@@ -88,7 +88,7 @@
 }
 
 func addHeaders(req *http.Request) {
-	req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+	req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
 	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -104,6 +104,9 @@
 type expected200Error struct {
 }
 
+type authFailError struct {
+}
+
 func (an expected200Error) Error() string {
 	return "Did not recieve OK response"
 }
@@ -115,3 +118,7 @@
 func (a changeServerError) Error() string {
 	return a.Code
 }
+
+func (a authFailError) Error() string {
+	return "Authorization failed"
+}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 4659f9c..22823a4 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -13,6 +13,9 @@
 
 	Context("Sync", func() {
 
+		const expectedDataScopeId1 = "dataScope1"
+		const expectedDataScopeId2 = "dataScope2"
+
 		var initializeContext = func() {
 			testRouter = apid.API().Router()
 			testServer = httptest.NewServer(testRouter)
@@ -53,57 +56,62 @@
 			var lastSnapshot *common.Snapshot
 
 			expectedSnapshotTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms.company"},
-					common.Change{Table: "edgex.apid_cluster"},
-					common.Change{Table: "edgex.data_scope"}},
+				Changes: []common.Change{common.Change{Table: "kms_company"},
+					common.Change{Table: "edgex_apid_cluster"},
+					common.Change{Table: "edgex_data_scope"},
+					common.Change{Table: "kms_app_credential"},
+					common.Change{Table: "kms_app_credential_apiproduct_mapper"},
+					common.Change{Table: "kms_developer"},
+					common.Change{Table: "kms_company_developer"},
+					common.Change{Table: "kms_api_product"},
+					common.Change{Table: "kms_app"}},
 			}
 
 			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
 				if s, ok := event.(*common.Snapshot); ok {
 
+					Expect(16).To(Equal(len(knownTables)))
 					Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
 
-					//add apid_cluster and data_scope since those would present if this were a real scenario
-					knownTables["kms.app_credential"] = true
-					knownTables["kms.app_credential_apiproduct_mapper"] = true
-					knownTables["kms.developer"] = true
-					knownTables["kms.company_developer"] = true
-					knownTables["kms.api_product"] = true
-					knownTables["kms.app"] = true
-
 					lastSnapshot = s
 
-					for _, t := range s.Tables {
-						switch t.Name {
+					db, _ := dataService.DBVersion(s.SnapshotInfo)
+					var rowCount int
+					var id string
 
-						case "edgex.apid_cluster":
-							Expect(t.Rows).To(HaveLen(1))
-							r := t.Rows[0]
-							var id string
-							r.Get("id", &id)
-							Expect(id).To(Equal("bootstrap"))
+					err := db.Ping()
+					Expect(err).NotTo(HaveOccurred())
+					numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;")
+					if err != nil {
+						Fail("Failed to get correct DB")
+					}
+					Expect(true).To(Equal(numApidClusters.Next()))
+					numApidClusters.Scan(&rowCount)
+					Expect(1).To(Equal(rowCount))
+					apidClusters, _ := db.Query("select id from edgex_apid_cluster;")
+					apidClusters.Next()
+					apidClusters.Scan(&id)
+					Expect(id).To(Equal(expectedClusterId))
 
-						case "edgex.data_scope":
-							Expect(t.Rows).To(HaveLen(2))
-							r := t.Rows[1] // get the non-cluster row
+					numDataScopes, _ := db.Query("select distinct count(*) from edgex_data_scope;")
+					Expect(true).To(Equal(numDataScopes.Next()))
+					numDataScopes.Scan(&rowCount)
+					Expect(2).To(Equal(rowCount))
+					dataScopes, _ := db.Query("select id from edgex_data_scope;")
+					dataScopes.Next()
+					dataScopes.Scan(&id)
+					dataScopes.Next()
 
-							var id, clusterID, env, org, scope string
-							r.Get("id", &id)
-							r.Get("apid_cluster_id", &clusterID)
-							r.Get("env", &env)
-							r.Get("org", &org)
-							r.Get("scope", &scope)
-
-							Expect(id).To(Equal("ert452"))
-							Expect(scope).To(Equal("ert452"))
-							Expect(clusterID).To(Equal("bootstrap"))
-							Expect(env).To(Equal("prod"))
-							Expect(org).To(Equal("att"))
-						}
+					if id == expectedDataScopeId1 {
+						dataScopes.Scan(&id)
+						Expect(id).To(Equal(expectedDataScopeId2))
+					} else {
+						dataScopes.Scan(&id)
+						Expect(id).To(Equal(expectedDataScopeId1))
 					}
 
 				} else if cl, ok := event.(*common.ChangeList); ok {
-					closeDone = changeManager.close()
+					closeDone = apidChangeManager.close()
 					// ensure that snapshot switched DB versions
 					Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
 					expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
@@ -122,12 +130,12 @@
 						Expect(tenantID).To(Equal("ert452"))
 					}
 
-					Expect(tables).To(ContainElement("kms.app_credential"))
-					Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
-					Expect(tables).To(ContainElement("kms.developer"))
-					Expect(tables).To(ContainElement("kms.company_developer"))
-					Expect(tables).To(ContainElement("kms.api_product"))
-					Expect(tables).To(ContainElement("kms.app"))
+					Expect(tables).To(ContainElement("kms_app_credential"))
+					Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper"))
+					Expect(tables).To(ContainElement("kms_developer"))
+					Expect(tables).To(ContainElement("kms_company_developer"))
+					Expect(tables).To(ContainElement("kms_api_product"))
+					Expect(tables).To(ContainElement("kms_app"))
 
 					go func() {
 						// when close done, all handlers for the first changeList have been executed
@@ -135,10 +143,8 @@
 						defer GinkgoRecover()
 						// allow other handler to execute to insert last_sequence
 						var seq string
-						//for seq = ""; seq == ""; {
-						//	time.Sleep(50 * time.Millisecond)
-						err := getDB().
-							QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
+						err = getDB().
+							QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;").
 							Scan(&seq)
 						Expect(err).NotTo(HaveOccurred())
 						//}
@@ -163,9 +169,9 @@
 
 			initializeContext()
 			expectedTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms.company"},
-					common.Change{Table: "edgex.apid_cluster"},
-					common.Change{Table: "edgex.data_scope"}},
+				Changes: []common.Change{common.Change{Table: "kms_company"},
+					common.Change{Table: "edgex_apid_cluster"},
+					common.Change{Table: "edgex_data_scope"}},
 			}
 			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
 
@@ -174,7 +180,7 @@
 				if s, ok := event.(*common.Snapshot); ok {
 					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
 					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
-					closeDone = changeManager.close()
+					closeDone = apidChangeManager.close()
 					go func() {
 						// when close done, all handlers for the first snapshot have been executed
 						<-closeDone
@@ -283,18 +289,19 @@
 		 */
 		It("Should be able to handle duplicate snapshot during bootstrap", func() {
 			initializeContext()
-			tokenManager = createTokenManager()
-			snapManager = createSnapShotManager()
+			apidTokenManager = createSimpleTokenManager()
+			apidTokenManager.start()
+			apidSnapshotManager = createSnapShotManager()
 			events.Listen(ApigeeSyncEventSelector, &handler{})
 
 			scopes := []string{apidInfo.ClusterID}
 			snapshot := &common.Snapshot{}
-			snapManager.downloadSnapshot(scopes, snapshot)
-			snapManager.storeBootSnapshot(snapshot)
-			snapManager.storeDataSnapshot(snapshot)
+			apidSnapshotManager.downloadSnapshot(scopes, snapshot)
+			apidSnapshotManager.storeBootSnapshot(snapshot)
+			apidSnapshotManager.storeDataSnapshot(snapshot)
 			restoreContext()
-			<-snapManager.close()
-			tokenManager.close()
+			<-apidSnapshotManager.close()
+			apidTokenManager.close()
 		}, 3)
 
 		It("Reuse http.Client connection for multiple concurrent requests", func() {
diff --git a/change_test.go b/change_test.go
new file mode 100644
index 0000000..7a69995
--- /dev/null
+++ b/change_test.go
@@ -0,0 +1,202 @@
+package apidApigeeSync
+
+import (
+	"github.com/30x/apid-core"
+	"github.com/apigee-labs/transicator/common"
+	. "github.com/onsi/ginkgo"
+	"net/http/httptest"
+	"net/url"
+	"os"
+	"time"
+)
+
+var _ = Describe("Change Agent", func() {
+
+	Context("Change Agent Unit Tests", func() {
+		handler := handler{}
+
+		var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
+			initDb(sqlfile, "./mockdb_change.sqlite3")
+			file, err := os.Open("./mockdb_change.sqlite3")
+			if err != nil {
+				Fail("Failed to open mock db for test")
+			}
+
+			s := common.Snapshot{}
+			err = processSnapshotServerFileResponse(dbId, file, &s)
+			if err != nil {
+				Fail("Error processing test snapshots")
+			}
+			return s
+		}
+
+		BeforeEach(func() {
+			event := createTestDb("./sql/init_mock_db.sql", "test_change")
+			handler.Handle(&event)
+			knownTables = extractTablesFromDB(getDB())
+		})
+
+		var initializeContext = func() {
+			testRouter = apid.API().Router()
+			testServer = httptest.NewServer(testRouter)
+
+			// set up mock server
+			mockParms := MockParms{
+				ReliableAPI:  true,
+				ClusterID:    config.GetString(configApidClusterId),
+				TokenKey:     config.GetString(configConsumerKey),
+				TokenSecret:  config.GetString(configConsumerSecret),
+				Scope:        "ert452",
+				Organization: "att",
+				Environment:  "prod",
+			}
+			testMock = Mock(mockParms, testRouter)
+
+			config.Set(configProxyServerBaseURI, testServer.URL)
+			config.Set(configSnapServerBaseURI, testServer.URL)
+			config.Set(configChangeServerBaseURI, testServer.URL)
+			config.Set(configPollInterval, 1*time.Millisecond)
+		}
+
+		var restoreContext = func() {
+
+			testServer.Close()
+			config.Set(configProxyServerBaseURI, dummyConfigValue)
+			config.Set(configSnapServerBaseURI, dummyConfigValue)
+			config.Set(configChangeServerBaseURI, dummyConfigValue)
+			config.Set(configPollInterval, 10*time.Millisecond)
+		}
+
+		It("test change agent with authorization failure", func() {
+			log.Debug("test change agent with authorization failure")
+			testTokenManager := &dummyTokenManager{make(chan bool)}
+			apidTokenManager = testTokenManager
+			apidTokenManager.start()
+			apidSnapshotManager = &dummySnapshotManager{}
+			initializeContext()
+			testMock.forceAuthFail()
+			wipeDBAferTest = true
+			apidChangeManager.pollChangeWithBackoff()
+			// auth check fails
+			<-testTokenManager.invalidateChan
+			log.Debug("closing")
+			<-apidChangeManager.close()
+			restoreContext()
+		})
+
+		It("test change agent with too old snapshot", func() {
+			log.Debug("test change agent with too old snapshot")
+			testTokenManager := &dummyTokenManager{make(chan bool)}
+			apidTokenManager = testTokenManager
+			apidTokenManager.start()
+			testSnapshotManager := &dummySnapshotManager{make(chan bool)}
+			apidSnapshotManager = testSnapshotManager
+			initializeContext()
+
+			testMock.passAuthCheck()
+			testMock.forceNewSnapshot()
+			wipeDBAferTest = true
+			apidChangeManager.pollChangeWithBackoff()
+			<-testSnapshotManager.downloadCalledChan
+			log.Debug("closing")
+			<-apidChangeManager.close()
+			restoreContext()
+		})
+
+		It("change agent should retry with authorization failure", func(done Done) {
+			log.Debug("change agent should retry with authorization failure")
+			testTokenManager := &dummyTokenManager{make(chan bool)}
+			apidTokenManager = testTokenManager
+			apidTokenManager.start()
+			apidSnapshotManager = &dummySnapshotManager{}
+			initializeContext()
+			testMock.forceAuthFail()
+			testMock.forceNoSnapshot()
+			wipeDBAferTest = true
+
+			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+				if _, ok := event.(*common.ChangeList); ok {
+					closeDone := apidChangeManager.close()
+					log.Debug("closing")
+					go func() {
+						// when close done, all handlers for the first snapshot have been executed
+						<-closeDone
+						restoreContext()
+						close(done)
+					}()
+
+				}
+			})
+
+			apidChangeManager.pollChangeWithBackoff()
+			// auth check fails
+			<-testTokenManager.invalidateChan
+		})
+
+	})
+})
+
+type dummyTokenManager struct {
+	invalidateChan chan bool
+}
+
+func (t *dummyTokenManager) getBearerToken() string {
+	return ""
+}
+
+func (t *dummyTokenManager) invalidateToken() error {
+	log.Debug("invalidateToken called")
+	testMock.passAuthCheck()
+	t.invalidateChan <- true
+	return nil
+}
+
+func (t *dummyTokenManager) getToken() *oauthToken {
+	return nil
+}
+
+func (t *dummyTokenManager) close() {
+	return
+}
+
+func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
+	return func(chan bool) error {
+		return nil
+	}
+}
+
+func (t *dummyTokenManager) start() {
+
+}
+
+type dummySnapshotManager struct {
+	downloadCalledChan chan bool
+}
+
+func (s *dummySnapshotManager) close() <-chan bool {
+	closeChan := make(chan bool)
+	close(closeChan)
+	return closeChan
+}
+
+func (s *dummySnapshotManager) downloadBootSnapshot() {
+
+}
+
+func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (s *dummySnapshotManager) downloadDataSnapshot() {
+	log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+	s.downloadCalledChan <- true
+}
+
+func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+
+}
+
+func (s *dummySnapshotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+	return nil
+}
diff --git a/changes.go b/changes.go
index e9be041..9e8d170 100644
--- a/changes.go
+++ b/changes.go
@@ -54,8 +54,8 @@
 		log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
 		go func() {
 			c.quitChan <- true
-			tokenManager.close()
-			<-snapManager.close()
+			apidTokenManager.close()
+			<-apidSnapshotManager.close()
 			log.Debug("change manager closed")
 			finishChan <- false
 		}()
@@ -65,8 +65,8 @@
 	log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
 	go func() {
 		c.quitChan <- true
-		tokenManager.close()
-		<-snapManager.close()
+		apidTokenManager.close()
+		<-apidSnapshotManager.close()
 		log.Debug("change manager closed")
 		finishChan <- true
 	}()
@@ -183,8 +183,11 @@
 		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
 		switch r.StatusCode {
 		case http.StatusUnauthorized:
-			tokenManager.invalidateToken()
-			return nil
+			err = apidTokenManager.invalidateToken()
+			if err != nil {
+				return err
+			}
+			return authFailError{}
 
 		case http.StatusNotModified:
 			return nil
@@ -206,7 +209,7 @@
 				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
 				err = apiErr
 			}
-			return nil
+			return err
 		}
 		return nil
 	}
@@ -271,7 +274,7 @@
 	}
 	if c, ok := err.(changeServerError); ok {
 		log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
-		snapManager.downloadDataSnapshot()
+		apidSnapshotManager.downloadDataSnapshot()
 	} else {
 		log.Debugf("Error connecting to changeserver: %v", err)
 	}
@@ -289,7 +292,7 @@
 	}
 
 	for _, change := range changes {
-		if !a[change.Table] {
+		if !a[normalizeTableName(change.Table)] {
 			log.Infof("Unable to find %s table in current known tables", change.Table)
 			return true
 		}
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go
index 6773606..bf4de27 100644
--- a/cmd/mockServer/main.go
+++ b/cmd/mockServer/main.go
@@ -5,7 +5,6 @@
 
 	"os"
 
-	"time"
 
 	"github.com/30x/apid-core"
 	"github.com/30x/apid-core/factory"
@@ -22,11 +21,8 @@
 	reliable := f.Bool("reliable", true, "if false, server will often send 500 errors")
 
 	numDevs := f.Int("numDevs", 2, "number of developers in snapshot")
-	addDevEach := f.Duration("addDevEach", 0*time.Second, "add a developer each duration (default 0s)")
-	upDevEach := f.Duration("upDevEach", 0*time.Second, "update a developer each duration (default 0s)")
 
 	numDeps := f.Int("numDeps", 2, "number of deployments in snapshot")
-	upDepEach := f.Duration("upDepEach", 0*time.Second, "update (replace) a deployment each duration (default 0s)")
 
 	f.Parse(os.Args[1:])
 
@@ -51,10 +47,7 @@
 		Organization:           "org",
 		Environment:            "test",
 		NumDevelopers:          *numDevs,
-		AddDeveloperEvery:      *addDevEach,
-		UpdateDeveloperEvery:   *upDevEach,
 		NumDeployments:         *numDeps,
-		ReplaceDeploymentEvery: *upDepEach,
 		BundleURI:              *bundleURI,
 	}
 
diff --git a/data.go b/data.go
index 4761373..a9b1496 100644
--- a/data.go
+++ b/data.go
@@ -8,6 +8,9 @@
 	"sync"
 
 	"github.com/30x/apid-core"
+	"github.com/apigee-labs/transicator/common"
+	"sort"
+	"strings"
 )
 
 var (
@@ -39,30 +42,6 @@
 	    last_snapshot_info text,
 	    PRIMARY KEY (instance_id)
 	);
-	CREATE TABLE IF NOT EXISTS APID_CLUSTER (
-	    id text,
-	    name text,
-	    description text,
-	    umbrella_org_app_name text,
-	    created text,
-	    created_by text,
-	    updated text,
-	    updated_by text,
-	    last_sequence text,
-	    PRIMARY KEY (id)
-	);
-	CREATE TABLE IF NOT EXISTS DATA_SCOPE (
-	    id text,
-	    apid_cluster_id text,
-	    scope text,
-	    org text,
-	    env text,
-	    created text,
-	    created_by text,
-	    updated text,
-	    updated_by text,
-	    PRIMARY KEY (id, apid_cluster_id)
-	);
 	`)
 	if err != nil {
 		return err
@@ -85,86 +64,289 @@
 	dbMux.Unlock()
 }
 
-func insertApidCluster(dac dataApidCluster, txn *sql.Tx) error {
+//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
+func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
 
-	log.Debugf("inserting into APID_CLUSTER: %v", dac)
-
-	//replace to accomodate same snapshot txid
-	stmt, err := txn.Prepare(`
-	REPLACE INTO APID_CLUSTER
-		(id, description, name, umbrella_org_app_name,
-		created, created_by, updated, updated_by,
-		last_sequence)
-	VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9);
-	`)
-	if err != nil {
-		log.Errorf("prepare insert into APID_CLUSTER transaction Failed: %v", err)
-		return err
-	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec(
-		dac.ID, dac.Description, dac.Name, dac.OrgAppName,
-		dac.Created, dac.CreatedBy, dac.Updated, dac.UpdatedBy,
-		"")
-
-	if err != nil {
-		log.Errorf("insert APID_CLUSTER failed: %v", err)
+	if len(rows) == 0 {
+		return false
 	}
 
-	return err
+	var orderedColumns []string
+	for column := range rows[0] {
+		orderedColumns = append(orderedColumns, column)
+	}
+	sort.Strings(orderedColumns)
+
+	sql := buildInsertSql(tableName, orderedColumns, rows)
+
+	prep, err := txn.Prepare(sql)
+	if err != nil {
+		log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
+		return false
+	}
+	defer prep.Close()
+
+	var values []interface{}
+
+	for _, row := range rows {
+		for _, columnName := range orderedColumns {
+			//use Value so that stmt exec does not complain about common.ColumnVal being a struct
+			values = append(values, row[columnName].Value)
+		}
+	}
+
+	//create prepared statement from existing template statement
+	_, err = prep.Exec(values...)
+
+	if err != nil {
+		log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
+		return false
+	}
+	log.Debugf("INSERT Success [%s] values=%v", sql, values)
+
+	return true
 }
 
-func insertDataScope(ds dataDataScope, txn *sql.Tx) error {
-
-	log.Debugf("insert DATA_SCOPE: %v", ds)
-
-	//replace to accomodate same snapshot txid
-	stmt, err := txn.Prepare(`
-	REPLACE INTO DATA_SCOPE
-		(id, apid_cluster_id, scope, org,
-		env, created, created_by, updated,
-		updated_by)
-	VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9);
-	`)
-	if err != nil {
-		log.Errorf("insert DATA_SCOPE failed: %v", err)
-		return err
+func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
+	var values = make([]interface{}, len(pkeys))
+	for i, pkey := range pkeys {
+		if row[pkey] == nil {
+			values[i] = nil
+		} else {
+			values[i] = row[pkey].Value
+		}
 	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec(
-		ds.ID, ds.ClusterID, ds.Scope, ds.Org,
-		ds.Env, ds.Created, ds.CreatedBy, ds.Updated,
-		ds.UpdatedBy)
-
-	if err != nil {
-		log.Errorf("insert DATA_SCOPE failed: %v", err)
-		return err
-	}
-
-	return nil
+	return values
 }
 
-func deleteDataScope(ds dataDataScope, txn *sql.Tx) error {
-
-	log.Debugf("delete DATA_SCOPE: %v", ds)
-
-	stmt, err := txn.Prepare("DELETE FROM DATA_SCOPE WHERE id=$1 and apid_cluster_id=$2")
-	if err != nil {
-		log.Errorf("update DATA_SCOPE failed: %v", err)
-		return err
-	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec(ds.ID, ds.ClusterID)
-
-	if err != nil {
-		log.Errorf("delete DATA_SCOPE failed: %v", err)
-		return err
+func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
+	pkeys, err := getPkeysForTable(tableName)
+	sort.Strings(pkeys)
+	if len(pkeys) == 0 || err != nil {
+		log.Errorf("DELETE No primary keys found for table. %s", tableName)
+		return false
 	}
 
-	return nil
+	if len(rows) == 0 {
+		log.Errorf("No rows found for table.", tableName)
+		return false
+	}
+
+	sql := buildDeleteSql(tableName, rows[0], pkeys)
+	prep, err := txn.Prepare(sql)
+	if err != nil {
+		log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
+		return false
+	}
+	defer prep.Close()
+	for _, row := range rows {
+		values := getValueListFromKeys(row, pkeys)
+		// delete prepared statement from existing template statement
+		res, err := txn.Stmt(prep).Exec(values...)
+		if err != nil {
+			log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
+			return false
+		}
+		affected, err := res.RowsAffected()
+		if err == nil && affected != 0 {
+			log.Debugf("DELETE Success [%s] values=%v", sql, values)
+		} else if err == nil && affected == 0 {
+			log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
+			return false
+		} else {
+			log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
+			return false
+		}
+
+	}
+	return true
+
+}
+
+// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
+func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
+
+	var wherePlaceholders []string
+	i := 1
+	if row == nil {
+		return ""
+	}
+	normalizedTableName := normalizeTableName(tableName)
+
+	for _, pk := range pkeys {
+		wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
+		i++
+	}
+
+	sql := "DELETE FROM " + normalizedTableName
+	sql += " WHERE "
+	sql += strings.Join(wherePlaceholders, " AND ")
+
+	return sql
+
+}
+
+func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
+	pkeys, err := getPkeysForTable(tableName)
+	if len(pkeys) == 0 || err != nil {
+		log.Errorf("UPDATE No primary keys found for table.", tableName)
+		return false
+	}
+	if len(oldRows) == 0 || len(newRows) == 0 {
+		return false
+	}
+
+	var orderedColumns []string
+
+	//extract sorted orderedColumns
+	for columnName := range newRows[0] {
+		orderedColumns = append(orderedColumns, columnName)
+	}
+	sort.Strings(orderedColumns)
+
+	//build update statement, use arbitrary row as template
+	sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
+	prep, err := txn.Prepare(sql)
+	if err != nil {
+		log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
+		return false
+	}
+	defer prep.Close()
+
+	for i, row := range newRows {
+		var values []interface{}
+
+		for _, columnName := range orderedColumns {
+			//use Value so that stmt exec does not complain about common.ColumnVal being a struct
+			//TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping
+			//TODO right now this will only work when the column type is a string
+			if row[columnName] != nil {
+				values = append(values, row[columnName].Value)
+			} else {
+				values = append(values, nil)
+			}
+		}
+
+		//add values for where clause, use PKs of old row
+		for _, pk := range pkeys {
+			if oldRows[i][pk] != nil {
+				values = append(values, oldRows[i][pk].Value)
+			} else {
+				values = append(values, nil)
+			}
+
+		}
+
+		//create prepared statement from existing template statement
+		res, err := txn.Stmt(prep).Exec(values...)
+
+		if err != nil {
+			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
+			return false
+		}
+		numRowsAffected, err := res.RowsAffected()
+		if err != nil {
+			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
+			return false
+		}
+		//delete this once we figure out why tests are failing/not updating
+		log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
+		log.Debugf("UPDATE Success [%s] values=%v", sql, values)
+
+	}
+
+	return true
+
+}
+
+func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
+	if row == nil {
+		return ""
+	}
+	normalizedTableName := normalizeTableName(tableName)
+
+	var setPlaceholders, wherePlaceholders []string
+	i := 1
+
+	for _, columnName := range orderedColumns {
+		setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i))
+		i++
+	}
+
+	for _, pk := range pkeys {
+		wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
+		i++
+	}
+
+	sql := "UPDATE " + normalizedTableName + " SET "
+	sql += strings.Join(setPlaceholders, ", ")
+	sql += " WHERE "
+	sql += strings.Join(wherePlaceholders, " AND ")
+
+	return sql
+}
+
+//precondition: rows.length > 1000, max number of entities for sqlite
+func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
+	if len(rows) == 0 {
+		return ""
+	}
+	normalizedTableName := normalizeTableName(tableName)
+	var values string = ""
+
+	var i, j int
+	k := 1
+	for i = 0; i < len(rows)-1; i++ {
+		values += "("
+		for j = 0; j < len(orderedColumns)-1; j++ {
+			values += fmt.Sprintf("$%d,", k)
+			k++
+		}
+		values += fmt.Sprintf("$%d),", k)
+		k++
+	}
+	values += "("
+	for j = 0; j < len(orderedColumns)-1; j++ {
+		values += fmt.Sprintf("$%d,", k)
+		k++
+	}
+	values += fmt.Sprintf("$%d)", k)
+
+	sql := "INSERT INTO " + normalizedTableName
+	sql += "(" + strings.Join(orderedColumns, ",") + ") "
+	sql += "VALUES " + values
+
+	return sql
+}
+
+func getPkeysForTable(tableName string) ([]string, error) {
+	db := getDB()
+	normalizedTableName := normalizeTableName(tableName)
+	sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
+	rows, err := db.Query(sql, normalizedTableName)
+	if err != nil {
+		log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err)
+		return nil, err
+	}
+	var columnNames []string
+	defer rows.Close()
+	for rows.Next() {
+		var value string
+		err := rows.Scan(&value)
+		if err != nil {
+			log.Fatal(err)
+		}
+		columnNames = append(columnNames, value)
+	}
+	err = rows.Err()
+	if err != nil {
+		log.Fatal(err)
+	}
+	return columnNames, nil
+}
+
+func normalizeTableName(tableName string) string {
+	return strings.Replace(tableName, ".", "_", 1)
 }
 
 /*
@@ -178,9 +360,9 @@
 	var scope string
 	db := getDB()
 
-	rows, err := db.Query("select DISTINCT scope from DATA_SCOPE where apid_cluster_id = $1", configId)
+	rows, err := db.Query("select DISTINCT scope from EDGEX_DATA_SCOPE where apid_cluster_id = $1", configId)
 	if err != nil {
-		log.Errorf("Failed to query DATA_SCOPE: %v", err)
+		log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
 		return
 	}
 	defer rows.Close()
@@ -198,9 +380,9 @@
  */
 func getLastSequence() (lastSequence string) {
 
-	err := getDB().QueryRow("select last_sequence from APID_CLUSTER LIMIT 1").Scan(&lastSequence)
+	err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
 	if err != nil && err != sql.ErrNoRows {
-		log.Panicf("Failed to query APID_CLUSTER: %v", err)
+		log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
 		return
 	}
 
@@ -216,21 +398,21 @@
 
 	log.Debugf("updateLastSequence: %s", lastSequence)
 
-	stmt, err := getDB().Prepare("UPDATE APID_CLUSTER SET last_sequence=$1;")
+	stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
 	if err != nil {
-		log.Errorf("UPDATE APID_CLUSTER Failed: %v", err)
+		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
 		return err
 	}
 	defer stmt.Close()
 
 	_, err = stmt.Exec(lastSequence)
 	if err != nil {
-		log.Errorf("UPDATE APID_CLUSTER Failed: %v", err)
+		log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
 		return err
 	}
 
-	log.Infof("UPDATE APID_CLUSTER Success: %s", lastSequence)
-
+	log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
+	log.Infof("Replication lastSequence=%s", lastSequence)
 	return nil
 }
 
diff --git a/data_test.go b/data_test.go
new file mode 100644
index 0000000..a26597d
--- /dev/null
+++ b/data_test.go
@@ -0,0 +1,1185 @@
+package apidApigeeSync
+
+import (
+	"github.com/apigee-labs/transicator/common"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"sort"
+)
+
+var _ = Describe("data access tests", func() {
+
+	BeforeEach(func() {
+		db := getDB()
+
+		//all tests in this file operate on the api_product table.  Create the necessary tables for this here
+		getDB().Exec("CREATE TABLE _transicator_tables " +
+			"(tableName varchar not null, columnName varchar not null, " +
+			"typid integer, primaryKey bool);")
+		getDB().Exec("DELETE from _transicator_tables")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+		getDB().Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+		getDB().Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+			"api_resources text,approval_type text,scopes text,proxies text, environments text," +
+			"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+			"primary key (id,tenant_id,created_at,updated_at));")
+		getDB().Exec("DELETE from kms_api_product")
+
+		setDB(db)
+		initDB(db)
+
+	})
+
+	Context("Update processing", func() {
+		It("unit test buildUpdateSql with single primary key", func() {
+			testRow := common.Row{
+				"id": {
+					Value: "ch_api_product_2",
+				},
+				"api_resources": {
+					Value: "{}",
+				},
+				"environments": {
+					Value: "{Env_0, Env_1}",
+				},
+				"tenant_id": {
+					Value: "tenant_id_0",
+				},
+				"_change_selector": {
+					Value: "test_org0",
+				},
+			}
+
+			var orderedColumns []string
+			for column := range testRow {
+				orderedColumns = append(orderedColumns, column)
+			}
+			sort.Strings(orderedColumns)
+
+			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
+			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" +
+				" WHERE id=$6").To(Equal(result))
+		})
+
+		It("unit test buildUpdateSql with composite primary key", func() {
+			testRow := common.Row{
+				"id1": {
+					Value: "composite-key-1",
+				},
+				"id2": {
+					Value: "composite-key-2",
+				},
+				"api_resources": {
+					Value: "{}",
+				},
+				"environments": {
+					Value: "{Env_0, Env_1}",
+				},
+				"tenant_id": {
+					Value: "tenant_id_0",
+				},
+				"_change_selector": {
+					Value: "test_org0",
+				},
+			}
+
+			var orderedColumns []string
+			for column := range testRow {
+				orderedColumns = append(orderedColumns, column)
+			}
+			sort.Strings(orderedColumns)
+
+			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
+			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" +
+				" WHERE id1=$7 AND id2=$8").To(Equal(result))
+		})
+
+		It("test update with composite primary key", func() {
+			event := &common.ChangeList{}
+
+			//this needs to match what is actually in the DB
+			oldRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "A product for testing Greg",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			newRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    oldRow,
+					Operation: 1,
+				},
+			}
+			//insert and assert success
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//create update event
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			//do the update
+			Expect(true).To(Equal(processChangeList(event)))
+			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+		})
+
+		It("update should succeed if newrow modifies the primary key", func() {
+			event := &common.ChangeList{}
+
+			//this needs to match what is actually in the DB
+			oldRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "A product for testing Greg",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			newRow := common.Row{
+				"id": {
+					Value: "new_id",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    oldRow,
+					Operation: 1,
+				},
+			}
+			//insert and assert success
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//create update event
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			//do the update
+			Expect(true).To(Equal(processChangeList(event)))
+			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+		})
+
+		It("update should succeed if newrow contains fewer fields than oldrow", func() {
+			event := &common.ChangeList{}
+
+			oldRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "A product for testing Greg",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			newRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    oldRow,
+					Operation: 1,
+				},
+			}
+			//insert and assert success
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//create update event
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			//do the update
+			Expect(true).To(Equal(processChangeList(event)))
+			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+		})
+
+		It("update should succeed if oldrow contains fewer fields than newrow", func() {
+			event := &common.ChangeList{}
+
+			oldRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "A product for testing Greg",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			newRow := common.Row{
+				"id": {
+					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    oldRow,
+					Operation: 1,
+				},
+			}
+			//insert and assert success
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//create update event
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    oldRow,
+					NewRow:    newRow,
+					Operation: 2,
+				},
+			}
+
+			//do the update
+			Expect(true).To(Equal(processChangeList(event)))
+			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+		})
+	})
+
+	Context("Insert processing", func() {
+		It("Properly constructs insert sql for one row", func() {
+			newRow := common.Row{
+				"id": {
+					Value: "new_id",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			var orderedColumns []string
+			for column := range newRow {
+				orderedColumns = append(orderedColumns, column)
+			}
+			sort.Strings(orderedColumns)
+
+			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)"
+			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
+		})
+
+		It("Properly constructs insert sql for multiple rows", func() {
+			newRow1 := common.Row{
+				"id": {
+					Value: "1",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+			newRow2 := common.Row{
+				"id": {
+					Value: "2",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			var orderedColumns []string
+			for column := range newRow1 {
+				orderedColumns = append(orderedColumns, column)
+			}
+			sort.Strings(orderedColumns)
+
+			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)"
+			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
+		})
+
+		It("Properly executes insert for a single rows", func() {
+			event := &common.ChangeList{}
+
+			newRow1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow1,
+					Operation: 1,
+				},
+			}
+
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+		})
+
+		It("Properly executed insert for multiple rows", func() {
+			event := &common.ChangeList{}
+
+			newRow1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+			newRow2 := common.Row{
+				"id": {
+					Value: "b",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow1,
+					Operation: 1,
+				},
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow2,
+					Operation: 1,
+				},
+			}
+
+			Expect(true).To(Equal(processChangeList(event)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(2))
+		})
+
+		It("Fails to execute if row does not match existing table schema", func() {
+			event := &common.ChangeList{}
+
+			newRow1 := common.Row{
+				"not_and_id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow1,
+					Operation: 1,
+				},
+			}
+
+			ok := processChangeList(event)
+			Expect(false).To(Equal(ok))
+
+			var nRows int
+			//assert that no extraneous rows were added
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(0))
+		})
+
+		It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
+			event := &common.ChangeList{}
+			newRow1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			newRow2 := common.Row{
+				"not_and_id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow1,
+					Operation: 1,
+				},
+				{
+					Table:     "kms.api_product",
+					NewRow:    newRow2,
+					Operation: 1,
+				},
+			}
+
+			ok := processChangeList(event)
+			Expect(false).To(Equal(ok))
+		})
+	})
+
+	Context("Delete processing", func() {
+		It("Properly constructs sql prepare for Delete", func() {
+			row := common.Row{
+				"id": {
+					Value: "new_id",
+				},
+				"api_resources": {
+					Value: "{/**}",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "43aef41d",
+				},
+				"description": {
+					Value: "new description",
+				},
+				"created_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"updated_at": {
+					Value: "2017-03-01 22:50:41.75+00:00",
+				},
+				"_change_selector": {
+					Value: "43aef41d",
+				},
+			}
+
+			pkeys, err := getPkeysForTable("kms_api_product")
+			Expect(err).Should(Succeed())
+			sql := buildDeleteSql("kms_api_product", row, pkeys)
+			Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
+		})
+
+		It("Verify execute single insert & single delete works", func() {
+			event1 := &common.ChangeList{}
+			event2 := &common.ChangeList{}
+
+			Row1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event1.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    Row1,
+					Operation: 1,
+				},
+			}
+			event2.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    Row1,
+					Operation: 3,
+				},
+			}
+
+			Expect(true).To(Equal(processChangeList(event1)))
+			var nRows int
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			Expect(true).To(Equal(processChangeList(event2)))
+
+			// validate delete
+			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(0))
+
+			// delete again should fail - coz entry will not exist
+			Expect(false).To(Equal(processChangeList(event2)))
+		})
+
+		It("verify multiple insert and single delete works", func() {
+			event1 := &common.ChangeList{}
+			event2 := &common.ChangeList{}
+
+			Row1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			Row2 := common.Row{
+				"id": {
+					Value: "b",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event1.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    Row1,
+					Operation: 1,
+				},
+				{
+					Table:     "kms.api_product",
+					NewRow:    Row2,
+					Operation: 1,
+				},
+			}
+			event2.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    Row1,
+					Operation: 3,
+				},
+			}
+
+			Expect(true).To(Equal(processChangeList(event1)))
+			var nRows int
+			//verify first row
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//verify second row
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(2))
+
+			Expect(true).To(Equal(processChangeList(event2)))
+
+			//verify second row still exists
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			// validate delete
+			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			// delete again should fail - coz entry will not exist
+			Expect(false).To(Equal(processChangeList(event2)))
+		}, 3)
+
+		It("verify single insert and multiple delete fails", func() {
+			event1 := &common.ChangeList{}
+			event2 := &common.ChangeList{}
+
+			Row1 := common.Row{
+				"id": {
+					Value: "a",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			Row2 := common.Row{
+				"id": {
+					Value: "b",
+				},
+				"api_resources": {
+					Value: "r",
+				},
+				"environments": {
+					Value: "{test}",
+				},
+				"tenant_id": {
+					Value: "t",
+				},
+				"description": {
+					Value: "d",
+				},
+				"created_at": {
+					Value: "c",
+				},
+				"updated_at": {
+					Value: "u",
+				},
+				"_change_selector": {
+					Value: "cs",
+				},
+			}
+
+			event1.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					NewRow:    Row1,
+					Operation: 1,
+				},
+			}
+			event2.Changes = []common.Change{
+				{
+					Table:     "kms.api_product",
+					OldRow:    Row1,
+					Operation: 3,
+				},
+				{
+					Table:     "kms.api_product",
+					OldRow:    Row2,
+					Operation: 3,
+				},
+			}
+
+			Expect(true).To(Equal(processChangeList(event1)))
+			var nRows int
+			//verify insert
+			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+				"and _change_selector='cs'").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			//assert that no extraneous rows were added
+			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+			Expect(err).NotTo(HaveOccurred())
+			Expect(nRows).To(Equal(1))
+
+			Expect(false).To(Equal(processChangeList(event2)))
+
+		}, 3)
+	})
+})
diff --git a/init.go b/init.go
index cf90781..d9b767e 100644
--- a/init.go
+++ b/init.go
@@ -30,16 +30,16 @@
 
 var (
 	/* All set during plugin initialization */
-	log           apid.LogService
-	config        apid.ConfigService
-	dataService   apid.DataService
-	events        apid.EventsService
-	apidInfo      apidInstanceInfo
-	newInstanceID bool
-	tokenManager  *tokenMan
-	changeManager *pollChangeManager
-	snapManager   *snapShotManager
-	httpclient    *http.Client
+	log                 apid.LogService
+	config              apid.ConfigService
+	dataService         apid.DataService
+	events              apid.EventsService
+	apidInfo            apidInstanceInfo
+	newInstanceID       bool
+	apidTokenManager    tokenManager
+	apidChangeManager   changeManager
+	apidSnapshotManager snapShotManager
+	httpclient          *http.Client
 
 	/* Set during post plugin initialization
 	 * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -62,7 +62,7 @@
 
 func initConfigDefaults() {
 	config.SetDefault(configPollInterval, 120*time.Second)
-	config.SetDefault(configSnapshotProtocol, "json")
+	config.SetDefault(configSnapshotProtocol, "sqlite")
 	name, errh := os.Hostname()
 	if (errh != nil) && (len(config.GetString(configName)) == 0) {
 		log.Errorf("Not able to get hostname for kernel. Please set '%s' property in config", configName)
@@ -83,16 +83,11 @@
 		Transport: tr,
 		Timeout:   httpTimeout,
 		CheckRedirect: func(req *http.Request, _ []*http.Request) error {
-			req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+			req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
 			return nil
 		},
 	}
 
-	//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
-	//also useful for testing
-	snapManager = createSnapShotManager()
-	changeManager = createChangeManager()
-
 	// set up default database
 	db, err := dataService.DB()
 	if err != nil {
@@ -117,6 +112,12 @@
 	return nil
 }
 
+func createManagers() {
+	apidSnapshotManager = createSnapShotManager()
+	apidChangeManager = createChangeManager()
+	apidTokenManager = createSimpleTokenManager()
+}
+
 func checkForRequiredValues() error {
 	// check for required values
 	for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
@@ -126,8 +127,8 @@
 		}
 	}
 	proto := config.GetString(configSnapshotProtocol)
-	if proto != "json" && proto != "proto" {
-		return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
+	if proto != "sqlite" {
+		return fmt.Errorf("Illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
 	}
 
 	return nil
@@ -137,7 +138,7 @@
 	log = logger
 }
 
-/* Idempotent state initialization */
+/* initialization */
 func _initPlugin(services apid.Services) error {
 	SetLogger(services.Log().ForModule("apigeeSync"))
 	log.Debug("start init")
@@ -165,6 +166,8 @@
 		return pluginData, err
 	}
 
+	createManagers()
+
 	/* This callback function will get called once all the plugins are
 	 * initialized (not just this plugin). This is needed because,
 	 * downloadSnapshots/changes etc have to begin to be processed only
@@ -208,8 +211,7 @@
 
 		log.Debug("start post plugin init")
 
-		tokenManager = createTokenManager()
-
+		apidTokenManager.start()
 		go bootstrap()
 
 		events.Listen(ApigeeSyncEventSelector, &handler{})
diff --git a/listener.go b/listener.go
index 49e3d6b..2a8b492 100644
--- a/listener.go
+++ b/listener.go
@@ -36,11 +36,7 @@
 		log.Panicf("Unable to access database: %v", err)
 	}
 
-	if config.GetString(configSnapshotProtocol) == "json" {
-		processJsonSnapshot(snapshot, db)
-	} else if config.GetString(configSnapshotProtocol) == "sqlite" {
-		processSqliteSnapshot(snapshot, db)
-	}
+	processSqliteSnapshot(db)
 
 	//update apid instance info
 	apidInfo.LastSnapshot = snapshot.SnapshotInfo
@@ -54,128 +50,73 @@
 
 }
 
-func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) {
-	//nothing to do as of now, here as a placeholder
-}
+func processSqliteSnapshot(db apid.DB) {
 
-func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) {
-
-	err := initDB(db)
+	var numApidClusters int
+	apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster")
 	if err != nil {
-		log.Panicf("Unable to initialize database: %v", err)
+		log.Panicf("Unable to read database: %s", err.Error())
+	}
+	apidClusters.Next()
+	err = apidClusters.Scan(&numApidClusters)
+	if err != nil {
+		log.Panicf("Unable to read database: %s", err.Error())
 	}
 
-	tx, err := db.Begin()
-	if err != nil {
-		log.Panicf("Error starting transaction: %v", err)
+	if numApidClusters != 1 {
+		log.Panic("Illegal state for apid_cluster. Must be a single row.")
 	}
-	defer tx.Rollback()
 
-	for _, table := range snapshot.Tables {
-
-		switch table.Name {
-		case LISTENER_TABLE_APID_CLUSTER:
-			if len(table.Rows) > 1 {
-				log.Panic("Illegal state for apid_cluster. Must be a single row.")
-			}
-			for _, row := range table.Rows {
-				ac := makeApidClusterFromRow(row)
-				err := insertApidCluster(ac, tx)
-				if err != nil {
-					log.Panicf("Snapshot update failed: %v", err)
-				}
-			}
-
-		case LISTENER_TABLE_DATA_SCOPE:
-			for _, row := range table.Rows {
-				ds := makeDataScopeFromRow(row)
-				err := insertDataScope(ds, tx)
-				if err != nil {
-					log.Panicf("Snapshot update failed: %v", err)
-				}
-			}
+	_, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+	if err != nil {
+		if err.Error() == "duplicate column name: last_sequence" {
+			return
+		} else {
+			log.Error("[[" + err.Error() + "]]")
+			log.Panicf("Unable to create last_sequence column on DB.  Unrecoverable error ", err)
 		}
 	}
-
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Error committing Snapshot change: %v", err)
-	}
 }
 
-func processChangeList(changes *common.ChangeList) {
+func processChangeList(changes *common.ChangeList) bool {
+
+	ok := false
 
 	tx, err := getDB().Begin()
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
+		return ok
 	}
 	defer tx.Rollback()
 
 	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
 
 	for _, change := range changes.Changes {
-		switch change.Table {
-		case "edgex.apid_cluster":
-			switch change.Operation {
-			case common.Delete:
-				// todo: shut down apid, delete databases, scorch the earth!
-				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-			default:
-				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-			}
-		case "edgex.data_scope":
-			switch change.Operation {
-			case common.Insert:
-				ds := makeDataScopeFromRow(change.NewRow)
-				err = insertDataScope(ds, tx)
-			case common.Delete:
-				ds := makeDataScopeFromRow(change.OldRow)
-				err = deleteDataScope(ds, tx)
-			default:
-				// common.Update is not allowed
-				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-			}
+		if change.Table == LISTENER_TABLE_APID_CLUSTER {
+			log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
 		}
-		if err != nil {
-			log.Panicf("Error processing ChangeList: %v", err)
+		switch change.Operation {
+		case common.Insert:
+			ok = insert(change.Table, []common.Row{change.NewRow}, tx)
+		case common.Update:
+			if change.Table == LISTENER_TABLE_DATA_SCOPE {
+				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
+			}
+			ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+		case common.Delete:
+			ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
+		}
+		if !ok {
+			log.Error("Sql Operation error. Operation rollbacked")
+			return ok
 		}
 	}
 
 	err = tx.Commit()
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
+		return false
 	}
-}
 
-func makeApidClusterFromRow(row common.Row) dataApidCluster {
-
-	dac := dataApidCluster{}
-
-	row.Get("id", &dac.ID)
-	row.Get("name", &dac.Name)
-	row.Get("umbrella_org_app_name", &dac.OrgAppName)
-	row.Get("created", &dac.Created)
-	row.Get("created_by", &dac.CreatedBy)
-	row.Get("updated", &dac.Updated)
-	row.Get("updated_by", &dac.UpdatedBy)
-	row.Get("description", &dac.Description)
-
-	return dac
-}
-
-func makeDataScopeFromRow(row common.Row) dataDataScope {
-
-	ds := dataDataScope{}
-
-	row.Get("id", &ds.ID)
-	row.Get("apid_cluster_id", &ds.ClusterID)
-	row.Get("scope", &ds.Scope)
-	row.Get("org", &ds.Org)
-	row.Get("env", &ds.Env)
-	row.Get("created", &ds.Created)
-	row.Get("created_by", &ds.CreatedBy)
-	row.Get("updated", &ds.Updated)
-	row.Get("updated_by", &ds.UpdatedBy)
-
-	return ds
+	return ok
 }
diff --git a/listener_test.go b/listener_test.go
index b5fea9b..d424a0b 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -5,48 +5,28 @@
 	. "github.com/onsi/gomega"
 
 	"github.com/apigee-labs/transicator/common"
+	"os"
 )
 
 var _ = Describe("listener", func() {
 
 	handler := handler{}
-	var saveLastSnapshot string
+
+	var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
+		initDb(sqlfile, "./mockdb.sqlite3")
+		file, err := os.Open("./mockdb.sqlite3")
+		Expect(err).ShouldNot(HaveOccurred())
+
+		s := common.Snapshot{}
+		err = processSnapshotServerFileResponse(dbId, file, &s)
+		Expect(err).ShouldNot(HaveOccurred())
+		return s
+	}
 
 	Context("ApigeeSync snapshot event", func() {
 
-		It("should set DB to appropriate version", func() {
-			log.Info("Starting listener tests...")
-
-			//save the last snapshot, so we can restore it at the end of this context
-			saveLastSnapshot = apidInfo.LastSnapshot
-
-			event := common.Snapshot{
-				SnapshotInfo: "test_snapshot",
-				Tables:       []common.Table{},
-			}
-
-			handler.Handle(&event)
-
-			Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo))
-
-			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(getDB() == expectedDB).Should(BeTrue())
-		})
-
 		It("should fail if more than one apid_cluster rows", func() {
-
-			event := common.Snapshot{
-				SnapshotInfo: "test_snapshot_fail",
-				Tables: []common.Table{
-					{
-						Name: LISTENER_TABLE_APID_CLUSTER,
-						Rows: []common.Row{{}, {}},
-					},
-				},
-			}
-
+			event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
 			Expect(func() { handler.Handle(&event) }).To(Panic())
 		}, 3)
 
@@ -68,74 +48,7 @@
 
 		It("should process a valid Snapshot", func() {
 
-			event := common.Snapshot{
-				SnapshotInfo: "test_snapshot_valid",
-				Tables: []common.Table{
-					{
-						Name: LISTENER_TABLE_APID_CLUSTER,
-						Rows: []common.Row{
-							{
-								"id":                    &common.ColumnVal{Value: "i"},
-								"name":                  &common.ColumnVal{Value: "n"},
-								"umbrella_org_app_name": &common.ColumnVal{Value: "o"},
-								"created":               &common.ColumnVal{Value: "c"},
-								"created_by":            &common.ColumnVal{Value: "c"},
-								"updated":               &common.ColumnVal{Value: "u"},
-								"updated_by":            &common.ColumnVal{Value: "u"},
-								"description":           &common.ColumnVal{Value: "d"},
-							},
-						},
-					},
-					{
-						Name: LISTENER_TABLE_DATA_SCOPE,
-						Rows: []common.Row{
-							{
-								"id":              &common.ColumnVal{Value: "i"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s1"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e1"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
-							},
-						},
-					},
-					{
-						Name: LISTENER_TABLE_DATA_SCOPE,
-						Rows: []common.Row{
-							{
-								"id":              &common.ColumnVal{Value: "j"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s1"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e2"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
-							},
-						},
-					},
-					{
-						Name: LISTENER_TABLE_DATA_SCOPE,
-						Rows: []common.Row{
-							{
-								"id":              &common.ColumnVal{Value: "k"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s2"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e3"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
-							},
-						},
-					},
-				},
-			}
+			event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
 
 			handler.Handle(&event)
 
@@ -146,13 +59,18 @@
 
 			db := getDB()
 
+			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
+			Expect(err).NotTo(HaveOccurred())
+
+			Expect(db == expectedDB).Should(BeTrue())
+
 			// apid Cluster
 			var dcs []dataApidCluster
 
 			rows, err := db.Query(`
 			SELECT id, name, description, umbrella_org_app_name,
 				created, created_by, updated, updated_by
-			FROM APID_CLUSTER`)
+			FROM EDGEX_APID_CLUSTER`)
 			Expect(err).NotTo(HaveOccurred())
 			defer rows.Close()
 
@@ -182,7 +100,7 @@
 			SELECT id, apid_cluster_id, scope, org,
 				env, created, created_by, updated,
 				updated_by
-			FROM DATA_SCOPE`)
+			FROM EDGEX_DATA_SCOPE`)
 			Expect(err).NotTo(HaveOccurred())
 			defer rows.Close()
 
@@ -219,7 +137,6 @@
 			Expect(scopes[1]).To(Equal("s2"))
 
 			//restore the last snapshot
-			apidInfo.LastSnapshot = saveLastSnapshot
 		}, 3)
 	})
 
@@ -228,10 +145,12 @@
 		Context(LISTENER_TABLE_APID_CLUSTER, func() {
 
 			It("insert event should panic", func() {
-				//save the last snapshot, so we can restore it at the end of this context
-				saveLastSnapshot = apidInfo.LastSnapshot
+				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
+				handler.Handle(&ssEvent)
 
-				event := common.ChangeList{
+				//save the last snapshot, so we can restore it at the end of this context
+
+				csEvent := common.ChangeList{
 					LastSequence: "test",
 					Changes: []common.Change{
 						{
@@ -241,10 +160,12 @@
 					},
 				}
 
-				Expect(func() { handler.Handle(&event) }).To(Panic())
+				Expect(func() { handler.Handle(&csEvent) }).To(Panic())
 			}, 3)
 
 			It("update event should panic", func() {
+				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
+				handler.Handle(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -258,17 +179,15 @@
 
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
-				apidInfo.LastSnapshot = saveLastSnapshot
 			}, 3)
 
-			PIt("delete event should kill all the things!")
 		})
 
 		Context(LISTENER_TABLE_DATA_SCOPE, func() {
 
 			It("insert event should add", func() {
-				//save the last snapshot, so we can restore it at the end of this context
-				saveLastSnapshot = apidInfo.LastSnapshot
+				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
+				handler.Handle(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -277,30 +196,32 @@
 							Operation: common.Insert,
 							Table:     LISTENER_TABLE_DATA_SCOPE,
 							NewRow: common.Row{
-								"id":              &common.ColumnVal{Value: "i"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s1"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
+								"id":               &common.ColumnVal{Value: "i"},
+								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+								"scope":            &common.ColumnVal{Value: "s1"},
+								"org":              &common.ColumnVal{Value: "o"},
+								"env":              &common.ColumnVal{Value: "e"},
+								"created":          &common.ColumnVal{Value: "c"},
+								"created_by":       &common.ColumnVal{Value: "c"},
+								"updated":          &common.ColumnVal{Value: "u"},
+								"updated_by":       &common.ColumnVal{Value: "u"},
+								"_change_selector": &common.ColumnVal{Value: "cs"},
 							},
 						},
 						{
 							Operation: common.Insert,
 							Table:     LISTENER_TABLE_DATA_SCOPE,
 							NewRow: common.Row{
-								"id":              &common.ColumnVal{Value: "j"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s2"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
+								"id":               &common.ColumnVal{Value: "j"},
+								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+								"scope":            &common.ColumnVal{Value: "s2"},
+								"org":              &common.ColumnVal{Value: "o"},
+								"env":              &common.ColumnVal{Value: "e"},
+								"created":          &common.ColumnVal{Value: "c"},
+								"created_by":       &common.ColumnVal{Value: "c"},
+								"updated":          &common.ColumnVal{Value: "u"},
+								"updated_by":       &common.ColumnVal{Value: "u"},
+								"_change_selector": &common.ColumnVal{Value: "cs"},
 							},
 						},
 					},
@@ -314,7 +235,7 @@
 				SELECT id, apid_cluster_id, scope, org,
 					env, created, created_by, updated,
 					updated_by
-				FROM DATA_SCOPE`)
+				FROM EDGEX_DATA_SCOPE`)
 				Expect(err).NotTo(HaveOccurred())
 				defer rows.Close()
 
@@ -326,6 +247,7 @@
 					dds = append(dds, d)
 				}
 
+				//three already existing
 				Expect(len(dds)).To(Equal(2))
 				ds := dds[0]
 
@@ -349,6 +271,8 @@
 			}, 3)
 
 			It("delete event should delete", func() {
+				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
+				handler.Handle(&ssEvent)
 				insert := common.ChangeList{
 					LastSequence: "test",
 					Changes: []common.Change{
@@ -356,15 +280,16 @@
 							Operation: common.Insert,
 							Table:     LISTENER_TABLE_DATA_SCOPE,
 							NewRow: common.Row{
-								"id":              &common.ColumnVal{Value: "i"},
-								"apid_cluster_id": &common.ColumnVal{Value: "a"},
-								"scope":           &common.ColumnVal{Value: "s"},
-								"org":             &common.ColumnVal{Value: "o"},
-								"env":             &common.ColumnVal{Value: "e"},
-								"created":         &common.ColumnVal{Value: "c"},
-								"created_by":      &common.ColumnVal{Value: "c"},
-								"updated":         &common.ColumnVal{Value: "u"},
-								"updated_by":      &common.ColumnVal{Value: "u"},
+								"id":               &common.ColumnVal{Value: "i"},
+								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+								"scope":            &common.ColumnVal{Value: "s"},
+								"org":              &common.ColumnVal{Value: "o"},
+								"env":              &common.ColumnVal{Value: "e"},
+								"created":          &common.ColumnVal{Value: "c"},
+								"created_by":       &common.ColumnVal{Value: "c"},
+								"updated":          &common.ColumnVal{Value: "u"},
+								"updated_by":       &common.ColumnVal{Value: "u"},
+								"_change_selector": &common.ColumnVal{Value: "cs"},
 							},
 						},
 					},
@@ -386,13 +311,15 @@
 				handler.Handle(&delete)
 
 				var nRows int
-				err := getDB().QueryRow("SELECT count(id) FROM DATA_SCOPE").Scan(&nRows)
+				err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
 				Expect(err).NotTo(HaveOccurred())
 
-				Expect(nRows).To(Equal(0))
+				Expect(0).To(Equal(nRows))
 			}, 3)
 
-			It("update event should panic", func() {
+			It("update event should panic for data scopes table", func() {
+				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
+				handler.Handle(&ssEvent)
 
 				event := common.ChangeList{
 					LastSequence: "test",
@@ -406,10 +333,9 @@
 
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
-				apidInfo.LastSnapshot = saveLastSnapshot
 			}, 3)
 
+			//TODO add tests for update/insert/delete cluster
 		})
-
 	})
 })
diff --git a/managerInterfaces.go b/managerInterfaces.go
new file mode 100644
index 0000000..20bbf6f
--- /dev/null
+++ b/managerInterfaces.go
@@ -0,0 +1,29 @@
+package apidApigeeSync
+
+import (
+	"github.com/apigee-labs/transicator/common"
+	"net/url"
+)
+
+type tokenManager interface {
+	getBearerToken() string
+	invalidateToken() error
+	getToken() *oauthToken
+	close()
+	getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
+	start()
+}
+
+type snapShotManager interface {
+	close() <-chan bool
+	downloadBootSnapshot()
+	storeBootSnapshot(snapshot *common.Snapshot)
+	downloadDataSnapshot()
+	storeDataSnapshot(snapshot *common.Snapshot)
+	downloadSnapshot(scopes []string, snapshot *common.Snapshot) error
+}
+
+type changeManager interface {
+	close() <-chan bool
+	pollChangeWithBackoff()
+}
diff --git a/mock_server.go b/mock_server.go
index f7b3600..8349131 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -8,17 +8,20 @@
 	"math/rand"
 	"net/http"
 	"net/url"
-	"strconv"
 	"sync"
 	"sync/atomic"
-	"time"
 
 	"net"
 
+	"database/sql"
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"io"
+	"io/ioutil"
+	"os"
+	"strconv"
 )
 
 /*
@@ -41,19 +44,16 @@
 const oauthExpiresIn = 2 * 60 // 2 minutes
 
 type MockParms struct {
-	ReliableAPI            bool
-	ClusterID              string
-	TokenKey               string
-	TokenSecret            string
-	Scope                  string
-	Organization           string
-	Environment            string
-	NumDevelopers          int
-	AddDeveloperEvery      time.Duration
-	UpdateDeveloperEvery   time.Duration
-	NumDeployments         int
-	ReplaceDeploymentEvery time.Duration
-	BundleURI              string
+	ReliableAPI    bool
+	ClusterID      string
+	TokenKey       string
+	TokenSecret    string
+	Scope          string
+	Organization   string
+	Environment    string
+	NumDevelopers  int
+	NumDeployments int
+	BundleURI      string
 }
 
 func Mock(params MockParms, router apid.Router) *MockServer {
@@ -72,7 +72,6 @@
 	params          MockParms
 	oauthToken      string
 	snapshotID      string
-	snapshotTables  map[string][]common.Table // key = scopeID
 	changeChannel   chan []byte
 	sequenceID      *int64
 	maxDevID        *int64
@@ -80,10 +79,27 @@
 	minDeploymentID *int64
 	maxDeploymentID *int64
 	newSnap         *int32
+	authFail        *int32
+}
+
+func (m *MockServer) forceAuthFail() {
+	atomic.StoreInt32(m.authFail, 1)
+}
+
+func (m *MockServer) normalAuthCheck() {
+	atomic.StoreInt32(m.authFail, 0)
+}
+
+func (m *MockServer) passAuthCheck() {
+	atomic.StoreInt32(m.authFail, 2)
 }
 
 func (m *MockServer) forceNewSnapshot() {
-	atomic.SwapInt32(m.newSnap, 1)
+	atomic.StoreInt32(m.newSnap, 1)
+}
+
+func (m *MockServer) forceNoSnapshot() {
+	atomic.StoreInt32(m.newSnap, 0)
 }
 
 func (m *MockServer) lastSequenceID() string {
@@ -111,6 +127,28 @@
 	return strconv.FormatInt(newMinID-1, 10)
 }
 
+func initDb(statements, path string) {
+
+	f, _ := os.Create(path)
+	f.Close()
+
+	db, err := sql.Open("sqlite3", path)
+	if err != nil {
+		log.Panic("Could not instantiate mock db, %s", err)
+	}
+	defer db.Close()
+	sqlStatementsBuffer, err := ioutil.ReadFile(statements)
+	if err != nil {
+		log.Panic("Could not instantiate mock db, %s", err)
+	}
+	sqlStatementsString := string(sqlStatementsBuffer)
+	_, err = db.Exec(sqlStatementsString)
+	if err != nil {
+		log.Panic("Could not instantiate mock db, %s", err)
+	}
+
+}
+
 func (m *MockServer) init() {
 	defer GinkgoRecover()
 	RegisterFailHandler(func(message string, callerSkip ...int) {
@@ -125,79 +163,12 @@
 	*m.minDeploymentID = 1
 	m.maxDeploymentID = new(int64)
 	m.newSnap = new(int32)
+	m.authFail = new(int32)
+	*m.authFail = 0
 
-	go m.developerGenerator()
-	go m.developerUpdater()
-	go m.deploymentReplacer()
+	initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
+	initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
 
-	// cluster "scope"
-	cluster := m.newRow(map[string]string{
-		"id":               m.params.ClusterID,
-		"_change_selector": m.params.ClusterID,
-	})
-
-	// data scopes
-	var dataScopes []common.Row
-	dataScopes = append(dataScopes, cluster)
-	dataScopes = append(dataScopes, m.newRow(map[string]string{
-		"id":               m.params.Scope,
-		"scope":            m.params.Scope,
-		"org":              m.params.Organization,
-		"env":              m.params.Environment,
-		"apid_cluster_id":  m.params.ClusterID,
-		"_change_selector": m.params.Scope,
-	}))
-
-	// cluster & data_scope snapshot tables
-	m.snapshotTables = map[string][]common.Table{}
-	m.snapshotTables[m.params.ClusterID] = []common.Table{
-		{
-			Name: "edgex.apid_cluster",
-			Rows: []common.Row{cluster},
-		},
-		{
-			Name: "edgex.data_scope",
-			Rows: dataScopes,
-		},
-	}
-
-	var snapshotTableRows []tableRowMap
-
-	// generate one company
-	companyID := m.params.Organization
-	tenantID := m.params.Scope
-	changeSelector := m.params.Scope
-	company := tableRowMap{
-		"kms.company": m.newRow(map[string]string{
-			"id":               companyID,
-			"status":           "Active",
-			"tenant_id":        tenantID,
-			"name":             companyID,
-			"display_name":     companyID,
-			"_change_selector": changeSelector,
-		}),
-	}
-	snapshotTableRows = append(snapshotTableRows, company)
-
-	// generate snapshot developers
-	for i := 0; i < m.params.NumDevelopers; i++ {
-		developer := m.createDeveloperWithProductAndApp()
-		snapshotTableRows = append(snapshotTableRows, developer)
-	}
-	log.Infof("created %d developers", m.params.NumDevelopers)
-
-	// generate snapshot deployments
-	for i := 0; i < m.params.NumDeployments; i++ {
-		deployment := m.createDeployment()
-		snapshotTableRows = append(snapshotTableRows, deployment)
-	}
-	log.Infof("created %d deployments", m.params.NumDeployments)
-
-	m.snapshotTables[m.params.Scope] = m.concatTableRowMaps(snapshotTableRows...)
-
-	if m.params.NumDevelopers < 10 && m.params.NumDeployments < 10 {
-		log.Debugf("snapshotTables: %v", m.snapshotTables[m.params.Scope])
-	}
 }
 
 // developer, product, application, credential will have the same ID (developerID)
@@ -281,28 +252,19 @@
 
 	Expect(scopes).To(ContainElement(m.params.ClusterID))
 
-	m.snapshotID = generateUUID()
-	snapshot := &common.Snapshot{
-		SnapshotInfo: m.snapshotID,
+	w.Header().Set("Transicator-Snapshot-TXID", generateUUID())
+
+	if len(scopes) == 1 {
+		//send bootstrap db
+		err := streamFile("./mockdb_boot.sqlite3", w)
+		Expect(err).NotTo(HaveOccurred())
+		return
+	} else {
+		//send data db
+		err := streamFile("./mockdb.sqlite3", w)
+		Expect(err).NotTo(HaveOccurred())
+		return
 	}
-
-	// Note: if/when we support multiple scopes, we'd have to do a merge of table rows
-	for _, scope := range scopes {
-		tables := m.snapshotTables[scope]
-		for _, table := range tables {
-			snapshot.AddTables(table)
-		}
-	}
-
-	body, err := json.Marshal(snapshot)
-	Expect(err).NotTo(HaveOccurred())
-
-	log.Infof("sending snapshot: %s", m.snapshotID)
-	if len(body) < 10000 {
-		log.Debugf("snapshot: %#v", string(body))
-	}
-
-	w.Write(body)
 }
 
 func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
@@ -310,6 +272,7 @@
 
 	val := atomic.SwapInt32(m.newSnap, 0)
 	if val > 0 {
+		log.Debug("MockServer: force new snapshot")
 		w.WriteHeader(http.StatusBadRequest)
 		apiErr := changeServerError{
 			Code: "SNAPSHOT_TOO_OLD",
@@ -320,12 +283,14 @@
 		return
 	}
 
+	log.Debug("mock server sending change list")
+
 	q := req.URL.Query()
 
 	scopes := q["scope"]
-	block, err := strconv.Atoi(q.Get("block"))
+	_, err := strconv.Atoi(q.Get("block"))
 	Expect(err).NotTo(HaveOccurred())
-	since := q.Get("since")
+	_ = q.Get("since")
 
 	Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
 	//Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
@@ -333,11 +298,6 @@
 	Expect(scopes).To(ContainElement(m.params.ClusterID))
 	//Expect(scopes).To(ContainElement(m.params.Scope))
 
-	if since != "" {
-		m.sendChange(w, time.Duration(block)*time.Second)
-		return
-	}
-
 	// todo: the following is just legacy for the existing test in apigeeSync_suite_test
 	developer := m.createDeveloperWithProductAndApp()
 	changeList := m.createInsertChange(developer)
@@ -348,93 +308,6 @@
 	w.Write(body)
 }
 
-// generate developers w/ product and app
-func (m *MockServer) developerGenerator() {
-
-	for range time.Tick(m.params.AddDeveloperEvery) {
-
-		developer := m.createDeveloperWithProductAndApp()
-		changeList := m.createInsertChange(developer)
-
-		body, err := json.Marshal(changeList)
-		if err != nil {
-			log.Errorf("Error adding developer: %v", err)
-		}
-
-		log.Info("adding developer")
-		log.Debugf("body: %#v", string(body))
-		m.changeChannel <- body
-	}
-}
-
-// update random developers - set username
-func (m *MockServer) developerUpdater() {
-
-	for range time.Tick(m.params.UpdateDeveloperEvery) {
-
-		developerID := m.randomDeveloperID()
-
-		oldDev := m.createDeveloper(developerID)
-		delete(oldDev, "kms.company_developer")
-		newDev := m.createDeveloper(developerID)
-		delete(newDev, "kms.company_developer")
-
-		newRow := newDev["kms.developer"]
-		newRow["username"] = m.stringColumnVal("i_am_not_a_number")
-
-		changeList := m.createUpdateChange(oldDev, newDev)
-
-		body, err := json.Marshal(changeList)
-		if err != nil {
-			log.Errorf("Error updating developer: %v", err)
-		}
-
-		log.Info("updating developer")
-		log.Debugf("body: %#v", string(body))
-		m.changeChannel <- body
-	}
-}
-
-func (m *MockServer) deploymentReplacer() {
-
-	for range time.Tick(m.params.ReplaceDeploymentEvery) {
-
-		// delete
-		oldDep := tableRowMap{}
-		oldDep["edgex.deployment"] = m.newRow(map[string]string{
-			"id": m.popDeploymentID(),
-		})
-		deleteChange := m.createDeleteChange(oldDep)
-
-		// insert
-		newDep := m.createDeployment()
-		insertChange := m.createInsertChange(newDep)
-
-		changeList := m.concatChangeLists(deleteChange, insertChange)
-
-		body, err := json.Marshal(changeList)
-		if err != nil {
-			log.Errorf("Error replacing deployment: %v", err)
-		}
-
-		log.Info("replacing deployment")
-		log.Debugf("body: %#v", string(body))
-		m.changeChannel <- body
-	}
-}
-
-// todo: we could debounce this if necessary
-func (m *MockServer) sendChange(w http.ResponseWriter, timeout time.Duration) {
-	select {
-	case change := <-m.changeChannel:
-		log.Info("sending change to client")
-		w.Write(change)
-	case <-time.After(timeout):
-		log.Info("change request timeout")
-		w.WriteHeader(http.StatusNotModified)
-	}
-}
-
 // enables GoMega handling
 func (m *MockServer) gomega(target http.HandlerFunc) http.HandlerFunc {
 	return func(w http.ResponseWriter, req *http.Request) {
@@ -452,15 +325,29 @@
 // enforces handler auth
 func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
 	return func(w http.ResponseWriter, req *http.Request) {
-		auth := req.Header.Get("Authorization")
 
+		// force failing auth check
+		if atomic.LoadInt32(m.authFail) == 1 {
+			w.WriteHeader(http.StatusUnauthorized)
+			w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
+			return
+		}
+
+		// force passing auth check
+		if atomic.LoadInt32(m.authFail) == 2 {
+			target(w, req)
+			return
+		}
+
+		// check auth header
+		auth := req.Header.Get("Authorization")
 		expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
 		if auth != expectedAuth {
-			w.WriteHeader(http.StatusBadRequest)
+			w.WriteHeader(http.StatusUnauthorized)
 			w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
-		} else {
-			target(w, req)
+			return
 		}
+		target(w, req)
 	}
 }
 
@@ -537,7 +424,7 @@
 	Expect(err).ShouldNot(HaveOccurred())
 
 	rows := tableRowMap{}
-	rows["edgex.deployment"] = m.newRow(map[string]string{
+	rows["kms_deployment"] = m.newRow(map[string]string{
 		"id":                 deploymentID,
 		"bundle_config_id":   bundleID,
 		"apid_cluster_id":    m.params.ClusterID,
@@ -556,15 +443,14 @@
 
 	rows := tableRowMap{}
 
-	rows["kms.developer"] = m.newRow(map[string]string{
+	rows["kms_developer"] = m.newRow(map[string]string{
 		"id":        developerID,
 		"status":    "Active",
 		"tenant_id": tenantID,
 	})
 
 	// map developer onto to existing company
-	rows["kms.company_developer"] = m.newRow(map[string]string{
-		"id":           developerID,
+	rows["kms_company_developer"] = m.newRow(map[string]string{
 		"tenant_id":    tenantID,
 		"company_id":   companyID,
 		"developer_id": developerID,
@@ -581,7 +467,7 @@
 	resources := fmt.Sprintf("{%s}", "/") // todo: what should be here?
 
 	rows := tableRowMap{}
-	rows["kms.api_product"] = m.newRow(map[string]string{
+	rows["kms_api_product"] = m.newRow(map[string]string{
 		"id":            productID,
 		"api_resources": resources,
 		"environments":  environments,
@@ -596,21 +482,21 @@
 
 	rows := tableRowMap{}
 
-	rows["kms.app"] = m.newRow(map[string]string{
+	rows["kms_app"] = m.newRow(map[string]string{
 		"id":           applicationID,
 		"developer_id": developerID,
 		"status":       "Approved",
 		"tenant_id":    tenantID,
 	})
 
-	rows["kms.app_credential"] = m.newRow(map[string]string{
+	rows["kms_app_credential"] = m.newRow(map[string]string{
 		"id":        credentialID,
 		"app_id":    applicationID,
 		"tenant_id": tenantID,
 		"status":    "Approved",
 	})
 
-	rows["kms.app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
+	rows["kms_app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
 		"apiprdt_id": productID,
 		"app_id":     applicationID,
 		"appcred_id": credentialID,
@@ -688,26 +574,6 @@
 }
 
 // create []common.Table from array of tableRowMaps
-func (m *MockServer) concatTableRowMaps(maps ...tableRowMap) []common.Table {
-	tableMap := map[string]*common.Table{}
-	for _, m := range maps {
-		for name, row := range m {
-			if _, ok := tableMap[name]; !ok {
-				tableMap[name] = &common.Table{
-					Name: name,
-				}
-			}
-			tableMap[name].AddRowstoTable(row)
-		}
-	}
-	result := []common.Table{}
-	for _, v := range tableMap {
-		result = append(result, *v)
-	}
-	return result
-}
-
-// create []common.Table from array of tableRowMaps
 func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
 	result := common.ChangeList{}
 	if len(changeLists) > 0 {
@@ -721,3 +587,16 @@
 	}
 	return result
 }
+
+func streamFile(srcFile string, w http.ResponseWriter) error {
+	inFile, err := os.Open(srcFile)
+	if err != nil {
+		return err
+	}
+	defer inFile.Close()
+
+	w.Header().Set("Content-Type", "application/transicator+sqlite")
+
+	_, err = io.Copy(w, inFile)
+	return err
+}
diff --git a/snapshot.go b/snapshot.go
index 3b288a2..f4cb8bd 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -1,7 +1,6 @@
 package apidApigeeSync
 
 import (
-	"encoding/json"
 	"github.com/30x/apid-core"
 	"github.com/30x/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
@@ -16,7 +15,7 @@
 	"time"
 )
 
-type snapShotManager struct {
+type simpleSnapShotManager struct {
 	// to send quit signal to the downloading thread
 	quitChan chan bool
 	// to mark the graceful close of snapshotManager
@@ -27,10 +26,10 @@
 	isDownloading *int32
 }
 
-func createSnapShotManager() *snapShotManager {
+func createSnapShotManager() *simpleSnapShotManager {
 	isClosedInt := int32(0)
 	isDownloadingInt := int32(0)
-	return &snapShotManager{
+	return &simpleSnapShotManager{
 		quitChan:      make(chan bool, 1),
 		finishChan:    make(chan bool, 1),
 		isClosed:      &isClosedInt,
@@ -44,7 +43,7 @@
  * use <- close() for blocking close
  * should only be called by pollChangeManager, because pollChangeManager is dependent on it
  */
-func (s *snapShotManager) close() <-chan bool {
+func (s *simpleSnapShotManager) close() <-chan bool {
 	//has been closed before
 	if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
 		log.Error("snapShotManager: close() called on a closed snapShotManager!")
@@ -64,7 +63,7 @@
 }
 
 // retrieve boot information: apid_config and apid_config_scope
-func (s *snapShotManager) downloadBootSnapshot() {
+func (s *simpleSnapShotManager) downloadBootSnapshot() {
 	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
 		log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
 	}
@@ -100,12 +99,12 @@
 	s.storeBootSnapshot(snapshot)
 }
 
-func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
 	processSnapshot(snapshot)
 }
 
 // use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *snapShotManager) downloadDataSnapshot() {
+func (s *simpleSnapShotManager) downloadDataSnapshot() {
 	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
 		log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
 	}
@@ -133,21 +132,14 @@
 	s.storeDataSnapshot(snapshot)
 }
 
-func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
+func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
 	knownTables = extractTablesFromSnapshot(snapshot)
 
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+	_, err := dataService.DBVersion(snapshot.SnapshotInfo)
 	if err != nil {
 		log.Panicf("Database inaccessible: %v", err)
 	}
 
-	// if closed
-	if atomic.LoadInt32(s.isClosed) == int32(1) {
-		log.Warn("Trying to persistKnownTablesToDB with a closed snapShotManager")
-		return
-	}
-	persistKnownTablesToDB(knownTables, db)
-
 	log.Info("Emitting Snapshot to plugins")
 
 	select {
@@ -167,14 +159,31 @@
 	log.Debug("Extracting table names from snapshot")
 	if snapshot.Tables == nil {
 		//if this panic ever fires, it's a bug
-		log.Panicf("Attempt to extract known tables from snapshot without tables failed")
-	}
+		db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+		if err != nil {
+			log.Panicf("Database inaccessible: %v", err)
+		}
+		rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
+		if err != nil {
+			log.Panicf("Unable to read in known snapshot tables from sqlite file")
+		}
+		for rows.Next() {
+			var tableName string
+			rows.Scan(&tableName)
+			if err != nil {
+				log.Panic("Error scaning tableNames from _transicator_tables")
+			}
+			tables[tableName] = true
+		}
 
-	for _, table := range snapshot.Tables {
-		tables[table.Name] = true
-	}
+	} else {
 
+		for _, table := range snapshot.Tables {
+			tables[table.Name] = true
+		}
+	}
 	return tables
+
 }
 
 func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
@@ -182,7 +191,7 @@
 	tables = make(map[string]bool)
 
 	log.Debug("Extracting table names from existing DB")
-	rows, err := db.Query("SELECT name FROM _known_tables;")
+	rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
 	defer rows.Close()
 
 	if err != nil {
@@ -223,7 +232,7 @@
 // a blocking method
 // will keep retrying with backoff until success
 
-func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
+func (s *simpleSnapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
 	// if closed
 	if atomic.LoadInt32(s.isClosed) == int32(1) {
 		log.Warn("Trying to download snapshot with a closed snapShotManager")
@@ -263,16 +272,14 @@
 		}
 		addHeaders(req)
 
-		var processSnapshotResponse func(*http.Response, *common.Snapshot) error
+		var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error
 
-		// Set the transport protocol type based on conf file input
-		if config.GetString(configSnapshotProtocol) == "json" {
-			req.Header.Set("Accept", "application/json")
-			processSnapshotResponse = processSnapshotServerJsonResponse
-		} else if config.GetString(configSnapshotProtocol) == "sqlite" {
-			req.Header.Set("Accept", "application/transicator+sqlite")
-			processSnapshotResponse = processSnapshotServerFileResponse
+		if config.GetString(configSnapshotProtocol) != "sqlite" {
+			log.Panic("Only currently supported snashot protocol is sqlite")
+
 		}
+		req.Header.Set("Accept", "application/transicator+sqlite")
+		processSnapshotResponse = processSnapshotServerFileResponse
 
 		// Issue the request to the snapshot server
 		r, err := httpclient.Do(req)
@@ -290,7 +297,7 @@
 		}
 
 		// Decode the Snapshot server response
-		err = processSnapshotResponse(r, snapshot)
+		err = processSnapshotResponse(r.Header.Get("Transicator-Snapshot-TXID"), r.Body, snapshot)
 		if err != nil {
 			log.Errorf("Snapshot server response Data not parsable: %v", err)
 			return err
@@ -300,50 +307,24 @@
 	}
 }
 
-func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
-	log.Debugf("Inserting table names found in snapshot into db")
+func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
+	dbPath := data.DBPath("common/" + dbId)
+	log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
 
-	tx, err := db.Begin()
+	//this path includes the sqlite3 file name.  why does mkdir all stop at parent??
+	log.Infof("Creating directory with mkdirall %s", dbPath)
+	err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700)
 	if err != nil {
-		log.Panicf("Error starting transaction: %v", err)
+		log.Errorf("Error creating db path %s", err)
 	}
-	defer tx.Rollback()
-
-	_, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
-	if err != nil {
-		log.Panicf("Could not create _known_tables table: %s", err)
-	}
-
-	for name := range tables {
-		log.Debugf("Inserting %s into _known_tables", name)
-		_, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
-		if err != nil {
-			log.Panicf("Error encountered inserting into known tables ", err)
-		}
-
-	}
-
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Error committing transaction: %v", err)
-
-	}
-}
-
-func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error {
-	return json.NewDecoder(r.Body).Decode(snapshot)
-}
-
-func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error {
-	dbId := r.Header.Get("Transicator-Snapshot-TXID")
-	out, err := os.Create(data.DBPath(dbId))
+	out, err := os.Create(dbPath)
 	if err != nil {
 		return err
 	}
 	defer out.Close()
 
 	//stream respose to DB
-	_, err = io.Copy(out, r.Body)
+	_, err = io.Copy(out, body)
 
 	if err != nil {
 		return err
diff --git a/sql/init_listener_test_duplicate_apids.sql b/sql/init_listener_test_duplicate_apids.sql
new file mode 100644
index 0000000..21bd615
--- /dev/null
+++ b/sql/init_listener_test_duplicate_apids.sql
@@ -0,0 +1,37 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap2','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+COMMIT;
diff --git a/sql/init_listener_test_no_datascopes.sql b/sql/init_listener_test_no_datascopes.sql
new file mode 100644
index 0000000..6f5030a
--- /dev/null
+++ b/sql/init_listener_test_no_datascopes.sql
@@ -0,0 +1,37 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i');
+
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+
+COMMIT;
diff --git a/sql/init_listener_test_valid_snapshot.sql b/sql/init_listener_test_valid_snapshot.sql
new file mode 100644
index 0000000..4e7e7e5
--- /dev/null
+++ b/sql/init_listener_test_valid_snapshot.sql
@@ -0,0 +1,40 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','unused_in_listener_unit_tests');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('i','n','d','o', 'c', 'c', 'u','u', 'i');
+
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e1','c','c','u','u','a');
+INSERT INTO "edgex_data_scope" VALUES('i','a','s1','o','e2','c','c','u','u','a');
+INSERT INTO "edgex_data_scope" VALUES('k','a','s2','o','e3','c','c','u','u','a');
+
+COMMIT;
diff --git a/sql/init_mock_boot_db.sql b/sql/init_mock_boot_db.sql
new file mode 100644
index 0000000..ed66799
--- /dev/null
+++ b/sql/init_mock_boot_db.sql
@@ -0,0 +1,36 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+COMMIT;
diff --git a/sql/init_mock_db.sql b/sql/init_mock_db.sql
new file mode 100644
index 0000000..dc14fda
--- /dev/null
+++ b/sql/init_mock_db.sql
@@ -0,0 +1,246 @@
+PRAGMA foreign_keys=OFF;
+BEGIN TRANSACTION;
+CREATE TABLE _transicator_metadata
+(key varchar primary key,
+value varchar);
+INSERT INTO "_transicator_metadata" VALUES('snapshot','1142790:1142790:');
+CREATE TABLE _transicator_tables
+(tableName varchar not null,
+columnName varchar not null,
+typid integer,
+primaryKey bool);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','data_scope_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','uri',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksumtype',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','checksum',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_bundle_config','crc',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','data_scope_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','bundle_config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','entity_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','cust_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','org_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','dev_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','comp_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','apiprdt_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','app_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','appcred_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','type',19701,1);
+INSERT INTO "_transicator_tables" VALUES('attributes','value',1043,0);
+INSERT INTO "_transicator_tables" VALUES('attributes','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','status',19564,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','customer_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_organization','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','description',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','umbrella_org_app_name',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','created_by',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_apid_cluster','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','ext_ref_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','description',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_deployment_history','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('configuration','body',25,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('configuration','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','consumer_secret',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','method_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','issued_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','expires_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','app_status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','scopes',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','company_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','developer_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','roles',1015,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_at',1114,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_company_developer','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','deployment_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','action',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','apid_cluster_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','data_scope_id',1043,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','bundle_config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','config_json',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','created_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated',1114,0);
+INSERT INTO "_transicator_tables" VALUES('edgex_deployment_history','updated_by',25,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','name',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','display_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','access_type',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','callback_url',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','status',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','app_family',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','company_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','developer_id',2950,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','parent_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','type',19625,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','appcred_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','app_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','apiprdt_id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','status',19670,0);
+INSERT INTO "_transicator_tables" VALUES('kms_app_credential_apiproduct_mapper','_change_selector',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','id',2950,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','tenant_id',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','username',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','first_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','last_name',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','password',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','email',1043,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','status',19564,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','encrypted_password',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','salt',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','created_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','created_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_at',1114,1);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','updated_by',1043,0);
+INSERT INTO "_transicator_tables" VALUES('kms_developer','_change_selector',1043,0);
+CREATE TABLE "kms_deployment" (id text,bundle_config_id text,apid_cluster_id text,data_scope_id text,bundle_config_name text,bundle_config_json text,config_json text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,bundle_config_id,apid_cluster_id,data_scope_id,_change_selector));
+INSERT INTO "kms_deployment" VALUES('321e443b-9db9-4043-b987-1599e0cdd029','1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85','bootstrap','dataScope1','gcp-test-bundle','{"id":"1b6a5e15-4bb8-4c8e-ae3d-63e6efb9ba85","created":"2017-02-27T07:40:57.810Z","createdBy":"fierrom@google.com","updated":"2017-02-27T07:40:57.810Z","updatedBy":"fierrom@google.com","name":"gcp-test-bundle","uri":"https://gist.github.com/mdobson/f9d537c5192a660f692affc294266df2/archive/234c7cbf227d769278bee9b06ace51d6062fe96b.zip","checksumType":"md5","checksum":"06fde116f0270b3734a48653d0cfb495"}','{}','2017-02-27 07:41:33.888+00:00','fierrom@google.com','2017-02-27 07:41:33.888+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at));
+INSERT INTO "kms_api_product" VALUES('f5f07319-5104-471c-9df3-64b1842dbe00','43aef41d','test','test','','{/}','AUTO','{""}','{helloworld}','{test}','','',NULL,'2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','2017-02-27 07:32:49.897+00:00','vbhangale@apigee.com','43aef41d');
+INSERT INTO "kms_api_product" VALUES('87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','43aef41d','GregProduct','Greg''s Product','A product for testing Greg','{/**}','AUTO','{""}','{}','{test}','','',NULL,'2017-03-01 22:50:41.75+00:00','greg@google.com','2017-03-01 22:50:41.75+00:00','greg@google.com','43aef41d');
+CREATE TABLE attributes (tenant_id text,entity_id text,cust_id text,org_id text,dev_id text,comp_id text,apiprdt_id text,app_id text,appcred_id text,name text,type text,value text,_change_selector text, primary key (tenant_id,tenant_id,entity_id,entity_id,name,type,type));
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isSmbOrganization','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.mgmtGroup','ORGANIZATION','management-edgex','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isEdgexEnabled','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','ff0b5496-c674-4531-9443-ace334504f59','','ff0b5496-c674-4531-9443-ace334504f59','','','','','','features.isCpsEnabled','ORGANIZATION','true','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','access','APIPRODUCT','internal','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','f5f07319-5104-471c-9df3-64b1842dbe00','','','','','f5f07319-5104-471c-9df3-64b1842dbe00','','','test','APIPRODUCT','v1','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','DisplayName','APP','MitchTestApp','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','','','','','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','Notes','APP','','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','DisplayName','APP','MitchTestApp2','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87c20a31-a504-4ed5-89a5-700adfbb0142','','','','','','87c20a31-a504-4ed5-89a5-700adfbb0142','','Notes','APP','','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','','','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','','','access','APIPRODUCT','public','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','DisplayName','APP','Greg''s Test App','43aef41d');
+INSERT INTO "attributes" VALUES('43aef41d','be902c0c-c54b-4a65-85d6-358ff8639586','','','','','','be902c0c-c54b-4a65-85d6-358ff8639586','','Notes','APP','','43aef41d');
+CREATE TABLE "kms_company" (id text,tenant_id text,name text,display_name text,status text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,created_at,updated_at));
+CREATE TABLE "kms_organization" (id text,name text,display_name text,type text,tenant_id text,customer_id text,description text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,customer_id,created_at,updated_at));
+INSERT INTO "kms_organization" VALUES('ff0b5496-c674-4531-9443-ace334504f59','edgex_gcp1','edgex_gcp1','paid','43aef41d','307eadd7-c6d7-4ec1-b433-59bcd22cd06d','','2017-02-25 00:17:58.159+00:00','vbhangale@apigee.com','2017-02-25 00:18:14.729+00:00','vbhangale@apigee.com','43aef41d');
+CREATE TABLE "edgex_apid_cluster" (id text,name text,description text,umbrella_org_app_name text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,created_by,created_by,created_by,created_by,_change_selector));
+INSERT INTO "edgex_apid_cluster" VALUES('bootstrap','mitch-gcp-cluster','','X-5NF3iDkQLtQt6uPp4ELYhuOkzL5BbSMgf3Gx','2017-02-27 07:39:22.179+00:00','fierrom@google.com','2017-02-27 07:39:22.179+00:00','fierrom@google.com','bootstrap');
+CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,created blob,created_by text,updated blob,updated_by text,_change_selector text, primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+INSERT INTO "edgex_data_scope" VALUES('dataScope1','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','fierrom@google.com','bootstrap');
+INSERT INTO "edgex_data_scope" VALUES('dataScope2','bootstrap','43aef41d','edgex_gcp1','test','2017-02-27 07:40:25.094+00:00','fierrom@google.com','2017-02-27 07:40:25.094+00:00','sendtofierro@gmail.com','bootstrap');
+CREATE TABLE "kms_app_credential" (id text,tenant_id text,consumer_secret text,app_id text,method_type text,status text,issued_at blob,expires_at blob,app_status text,scopes text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,app_id,issued_at,expires_at));
+INSERT INTO "kms_app_credential" VALUES('xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','43aef41d','lscGO3lfs3zh8iQ','87c20a31-a504-4ed5-89a5-700adfbb0142','','APPROVED','2017-02-27 07:45:22.774+00:00','','','{}','2017-02-27 07:45:22.774+00:00','-NA-','2017-02-27 07:45:22.877+00:00','-NA-','43aef41d');
+INSERT INTO "kms_app_credential" VALUES('ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','43aef41d','5EBOSSQrLOLO9siN','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','','APPROVED','2017-02-27 07:43:23.263+00:00','','','{}','2017-02-27 07:43:23.263+00:00','-NA-','2017-02-27 07:48:16.717+00:00','-NA-','43aef41d');
+INSERT INTO "kms_app_credential" VALUES('DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','43aef41d','MTfK8xscShhnops','be902c0c-c54b-4a65-85d6-358ff8639586','','APPROVED','2017-03-01 22:52:28.019+00:00','','','{}','2017-03-01 22:52:28.019+00:00','-NA-','2017-03-01 22:52:28.022+00:00','-NA-','43aef41d');
+CREATE TABLE "kms_company_developer" (tenant_id text,company_id text,developer_id text,roles text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (tenant_id,company_id,developer_id));
+CREATE TABLE "kms_app" (id text,tenant_id text,name text,display_name text,access_type text,callback_url text,status text,app_family text,company_id text,developer_id text,parent_id text,type text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,name,name,app_family,parent_id,created_at,updated_at));
+INSERT INTO "kms_app" VALUES('87c20a31-a504-4ed5-89a5-700adfbb0142','43aef41d','MitchTestApp2','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:45:21.586+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:45:21.586+00:00','fierrom@google.com' ||
+  '','43aef41d');
+INSERT INTO "kms_app" VALUES('8f5c9b86-0783-439c-b8e6-7ab9549e30e8','43aef41d','MitchTestApp','','','','APPROVED','default','','8a350848-0aba-4dcc-aa60-97903efb42ef','8a350848-0aba-4dcc-aa60-97903efb42ef','DEVELOPER','2017-02-27 07:43:22.301+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:48:18.964+00:00','fierrom@google.com' ||
+  '','43aef41d');
+INSERT INTO "kms_app" VALUES('be902c0c-c54b-4a65-85d6-358ff8639586','43aef41d','GregTestApp','','','','APPROVED','default','','046974c2-9ae5-4452-a42f-bb6657e6cdbe','046974c2-9ae5-4452-a42f-bb6657e6cdbe','DEVELOPER','2017-03-01 22:52:27.615+00:00','greg@google.com','2017-03-01 22:52:27.615+00:00','greg@google.com','43aef41d');
+CREATE TABLE "kms_app_credential_apiproduct_mapper" (tenant_id text,appcred_id text,app_id text,apiprdt_id text,status text,_change_selector text, primary key (tenant_id,appcred_id,app_id,apiprdt_id));
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','ds986MejQqoWRSSeC0UTIPSJ3rtaG2xv','8f5c9b86-0783-439c-b8e6-7ab9549e30e8','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d');
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','xA9QylNTGQxKGYtHXwvmx8ldDaIJMAEx','87c20a31-a504-4ed5-89a5-700adfbb0142','f5f07319-5104-471c-9df3-64b1842dbe00','APPROVED','43aef41d');
+INSERT INTO "kms_app_credential_apiproduct_mapper" VALUES('43aef41d','DMh0uQOPA5rbhl4YTnGvBAzGzOGuMH3A','be902c0c-c54b-4a65-85d6-358ff8639586','87a4bfaa-b3c4-47cd-b6c5-378cdb68610c','APPROVED','43aef41d');
+CREATE TABLE "kms_developer" (id text,tenant_id text,username text,first_name text,last_name text,password text,email text,status text,encrypted_password text,salt text,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,tenant_id,username,email,created_at,updated_at));
+INSERT INTO "kms_developer" VALUES('8a350848-0aba-4dcc-aa60-97903efb42ef','43aef41d','mitchfierro','Mitch','Fierro','','fierrom@google.com','ACTIVE','','','2017-02-27 07:43:00.281+00:00','fierrom@google.com' ||
+ '','2017-02-27 07:43:00.281+00:00','fierrom@google.com' ||
+  '','43aef41d');
+INSERT INTO "kms_developer" VALUES('6ab21170-6bac-481d-9be1-9fda02bdd1da','43aef41d','adikgcp','gcp','dev','','adikancherla@gcp.com','ACTIVE','','','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','2017-02-27 23:50:24.426+00:00','akancherla@apigee.com','43aef41d');
+INSERT INTO "kms_developer" VALUES('046974c2-9ae5-4452-a42f-bb6657e6cdbe','43aef41d','gregbrail','Greg','Brail','','gregbrail@google.com','ACTIVE','','','2017-03-01 22:51:40.602+00:00','greg@google.com','2017-03-01 22:51:40.602+00:00','greg@google.com','43aef41d');
+COMMIT;
\ No newline at end of file
diff --git a/token.go b/token.go
index 424c8d4..56d6676 100644
--- a/token.go
+++ b/token.go
@@ -3,6 +3,7 @@
 import (
 	"bytes"
 	"encoding/json"
+	"errors"
 	"io/ioutil"
 	"net/http"
 	"net/url"
@@ -24,10 +25,10 @@
    man.close()
 */
 
-func createTokenManager() *tokenMan {
+func createSimpleTokenManager() *simpleTokenManager {
 	isClosedInt := int32(0)
 
-	t := &tokenMan{
+	t := &simpleTokenManager{
 		quitPollingForToken: make(chan bool, 1),
 		closed:              make(chan bool),
 		getTokenChan:        make(chan bool),
@@ -36,14 +37,10 @@
 		invalidateDone:      make(chan bool),
 		isClosed:            &isClosedInt,
 	}
-
-	t.retrieveNewToken()
-	t.refreshTimer = time.After(t.token.refreshIn())
-	go t.maintainToken()
 	return t
 }
 
-type tokenMan struct {
+type simpleTokenManager struct {
 	token               *oauthToken
 	isClosed            *int32
 	quitPollingForToken chan bool
@@ -55,11 +52,17 @@
 	invalidateDone      chan bool
 }
 
-func (t *tokenMan) getBearerToken() string {
+func (t *simpleTokenManager) start() {
+	t.retrieveNewToken()
+	t.refreshTimer = time.After(t.token.refreshIn())
+	go t.maintainToken()
+}
+
+func (t *simpleTokenManager) getBearerToken() string {
 	return t.getToken().AccessToken
 }
 
-func (t *tokenMan) maintainToken() {
+func (t *simpleTokenManager) maintainToken() {
 	for {
 		select {
 		case <-t.closed:
@@ -80,18 +83,19 @@
 }
 
 // will block until valid
-func (t *tokenMan) invalidateToken() {
+func (t *simpleTokenManager) invalidateToken() error {
 	//has been closed
 	if atomic.LoadInt32(t.isClosed) == int32(1) {
 		log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
-		return
+		return errors.New("invalidateToken() called on closed tokenManager")
 	}
 	log.Debug("invalidating token")
 	t.invalidateTokenChan <- true
 	<-t.invalidateDone
+	return nil
 }
 
-func (t *tokenMan) getToken() *oauthToken {
+func (t *simpleTokenManager) getToken() *oauthToken {
 	//has been closed
 	if atomic.LoadInt32(t.isClosed) == int32(1) {
 		log.Debug("TokenManager: getToken() called on closed tokenManager")
@@ -105,7 +109,7 @@
  * blocking close() of tokenMan
  */
 
-func (t *tokenMan) close() {
+func (t *simpleTokenManager) close() {
 	//has been closed
 	if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
 		log.Panic("TokenManager: close() has been called before!")
@@ -120,7 +124,7 @@
 }
 
 // don't call externally. will block until success.
-func (t *tokenMan) retrieveNewToken() {
+func (t *simpleTokenManager) retrieveNewToken() {
 
 	log.Debug("Getting OAuth token...")
 	uriString := config.GetString(configProxyServerBaseURI)
@@ -133,7 +137,7 @@
 	pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
 }
 
-func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
 	return func(_ chan bool) error {
 		form := url.Values{}
 		form.Set("grant_type", "client_credentials")
diff --git a/token_test.go b/token_test.go
index c8ec7ba..22ff425 100644
--- a/token_test.go
+++ b/token_test.go
@@ -80,7 +80,8 @@
 				w.Write(body)
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createTokenManager()
+			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 
 			Expect(token.AccessToken).ToNot(BeEmpty())
@@ -108,7 +109,8 @@
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
 
-			testedTokenManager := createTokenManager()
+			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 			Expect(token.AccessToken).ToNot(BeEmpty())
 
@@ -147,8 +149,8 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createTokenManager()
-
+			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager.start()
 			testedTokenManager.getToken()
 
 			<-finished
@@ -188,8 +190,8 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createTokenManager()
-
+			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager.start()
 			testedTokenManager.getToken()
 			testedTokenManager.invalidateToken()
 			testedTokenManager.getToken()