diff --git a/api.go b/api.go
index 73399d7..c0e75f3 100644
--- a/api.go
+++ b/api.go
@@ -24,11 +24,15 @@
 
 const tokenEndpoint = "/accesstoken"
 
-func InitAPI(services apid.Services) {
-	services.API().HandleFunc(tokenEndpoint, getAccessToken).Methods("GET")
+type ApiManager struct {
+	tokenMan tokenManager
 }
 
-func getAccessToken(w http.ResponseWriter, r *http.Request) {
+func (a *ApiManager) InitAPI(api apid.APIService) {
+	api.HandleFunc(tokenEndpoint, a.getAccessToken).Methods("GET")
+}
+
+func (a *ApiManager) getAccessToken(w http.ResponseWriter, r *http.Request) {
 	b := r.URL.Query().Get("block")
 	var timeout int
 	if b != "" {
@@ -42,14 +46,14 @@
 	log.Debugf("api timeout: %d", timeout)
 	ifNoneMatch := r.Header.Get("If-None-Match")
 
-	if apidTokenManager.getBearerToken() != ifNoneMatch {
-		w.Write([]byte(apidTokenManager.getBearerToken()))
+	if a.tokenMan.getBearerToken() != ifNoneMatch {
+		w.Write([]byte(a.tokenMan.getBearerToken()))
 		return
 	}
 
 	select {
-	case <-apidTokenManager.getTokenReadyChannel():
-		w.Write([]byte(apidTokenManager.getBearerToken()))
+	case <-a.tokenMan.getTokenReadyChannel():
+		w.Write([]byte(a.tokenMan.getBearerToken()))
 	case <-time.After(time.Duration(timeout) * time.Second):
 		w.WriteHeader(http.StatusNotModified)
 	}
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 5055f68..a53a488 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -19,7 +19,6 @@
 	. "github.com/onsi/gomega"
 
 	"io/ioutil"
-	"net/http/httptest"
 	"os"
 	"testing"
 	"time"
@@ -30,57 +29,47 @@
 )
 
 var (
-	tmpDir         string
-	testServer     *httptest.Server
-	testRouter     apid.Router
-	testMock       *MockServer
-	wipeDBAferTest bool
+	tmpDir string
 )
 
 const dummyConfigValue string = "placeholder"
 const expectedClusterId = "bootstrap"
 
 var _ = BeforeSuite(func() {
-	wipeDBAferTest = true
-})
-
-var _ = BeforeEach(func() {
 	apid.Initialize(factory.DefaultServicesFactory())
-
 	config = apid.Config()
 	dataService = apid.Data()
-	events = apid.Events()
-
+	eventService = apid.Events()
+	log = apid.Log().ForModule("apigeeSync")
 	var err error
 	tmpDir, err = ioutil.TempDir("", "api_test")
 	Expect(err).NotTo(HaveOccurred())
 	config.Set("local_storage_path", tmpDir)
-
 	config.Set(configProxyServerBaseURI, dummyConfigValue)
 	config.Set(configSnapServerBaseURI, dummyConfigValue)
 	config.Set(configChangeServerBaseURI, dummyConfigValue)
 	config.Set(configSnapshotProtocol, "sqlite")
 	config.Set(configPollInterval, 10*time.Millisecond)
 	config.Set(configDiagnosticMode, false)
-	config.Set(configName, "testhost")
-	config.Set(configApidClusterId, expectedClusterId)
 	config.Set(configConsumerKey, "XXXXXXX")
 	config.Set(configConsumerSecret, "YYYYYYY")
-
-	block = "0"
-	log = apid.Log().ForModule("apigeeSync")
+	config.Set(configApidInstanceID, "YYYYYYY")
 }, 3)
 
+var _ = BeforeEach(func() {
+	config.Set(configName, "testhost")
+	config.Set(configApidClusterId, expectedClusterId)
+	apidInfo.ClusterID = expectedClusterId
+	apidInfo.InstanceID = "YYYYYYY"
+	apidInfo.LastSnapshot = ""
+	apidInfo.IsNewInstance = true
+})
+
 var _ = AfterEach(func() {
-	apid.Events().Close()
-	lastSequence = ""
 })
 
 var _ = AfterSuite(func() {
 	apid.Events().Close()
-	if testServer != nil {
-		testServer.Close()
-	}
 	os.RemoveAll(tmpDir)
 })
 
diff --git a/apigee_sync.go b/apigee_sync.go
index 8a23079..47b9fd6 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -15,7 +15,6 @@
 package apidApigeeSync
 
 import (
-	"github.com/apid/apid-core"
 	"net/http"
 	"time"
 )
@@ -26,37 +25,6 @@
 	maxIdleConnsPerHost = 10
 )
 
-var knownTables = make(map[string]bool)
-
-/*
- *  Start from existing snapshot if possible
- *  If an existing snapshot does not exist, use the apid scope to fetch
- *  all data scopes, then get a snapshot for those data scopes
- *
- *  Then, poll for changes
- */
-func bootstrap() {
-	if isOfflineMode && apidInfo.LastSnapshot == "" {
-		log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
-	}
-
-	if apidInfo.LastSnapshot != "" {
-		snapshot := apidSnapshotManager.startOnLocalSnapshot(apidInfo.LastSnapshot)
-		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
-			apidChangeManager.pollChangeWithBackoff()
-		})
-
-		log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
-		return
-	}
-
-	apidSnapshotManager.downloadBootSnapshot()
-	apidSnapshotManager.downloadDataSnapshot()
-
-	apidChangeManager.pollChangeWithBackoff()
-
-}
-
 /*
  * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
  * for retrying on errors
@@ -103,8 +71,8 @@
 	}
 }
 
-func addHeaders(req *http.Request) {
-	req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
+func addHeaders(req *http.Request, token string) {
+	req.Header.Set("Authorization", "Bearer "+token)
 	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
deleted file mode 100644
index f7144a1..0000000
--- a/apigee_sync_test.go
+++ /dev/null
@@ -1,452 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
-	"github.com/apid/apid-core"
-	"github.com/apid/apid-core/util"
-	"github.com/apigee-labs/transicator/common"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
-	"net/http"
-	"net/http/httptest"
-)
-
-var _ = Describe("Sync", func() {
-	Context("offline mode", func() {
-		var (
-			testInstanceID   = util.GenerateUUID()
-			testInstanceName = "offline-instance-name"
-			testClusterID    = "offline-cluster-id"
-			testLastSnapshot = "offline-last-snapshot"
-			testChangeMan    *dummyChangeManager
-		)
-
-		var _ = BeforeEach(func() {
-			config.Set(configDiagnosticMode, true)
-			config.Set(configApidClusterId, testClusterID)
-			_initPlugin(apid.AllServices())
-			apidSnapshotManager = &dummySnapshotManager{}
-			testChangeMan = &dummyChangeManager{
-				pollChangeWithBackoffChan: make(chan bool, 1),
-			}
-			apidChangeManager = testChangeMan
-			apidTokenManager = &dummyTokenManager{}
-			apidInfo = apidInstanceInfo{
-				InstanceID:   testInstanceID,
-				InstanceName: testInstanceName,
-				ClusterID:    testClusterID,
-				LastSnapshot: testLastSnapshot,
-			}
-			updateApidInstanceInfo()
-
-		})
-
-		var _ = AfterEach(func() {
-			config.Set(configDiagnosticMode, false)
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).NotTo(HaveOccurred())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).NotTo(HaveOccurred())
-				err = tx.Commit()
-				Expect(err).NotTo(HaveOccurred())
-
-			}
-			wipeDBAferTest = true
-			newInstanceID = true
-			isOfflineMode = false
-		})
-
-		It("offline mode should bootstrap from local DB", func(done Done) {
-
-			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
-				if s, ok := event.(*common.Snapshot); ok {
-					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
-					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
-					Expect(s.SnapshotInfo).Should(Equal(testLastSnapshot))
-					Expect(s.Tables).To(BeNil())
-					close(done)
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-
-		}, 3)
-
-	})
-
-	Context("online mode", func() {
-		var _ = BeforeEach(func() {
-			_initPlugin(apid.AllServices())
-			createManagers()
-		})
-
-		var _ = AfterEach(func() {
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).NotTo(HaveOccurred())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).NotTo(HaveOccurred())
-				err = tx.Commit()
-				Expect(err).NotTo(HaveOccurred())
-			}
-			wipeDBAferTest = true
-		})
-
-		const expectedDataScopeId1 = "dataScope1"
-		const expectedDataScopeId2 = "dataScope2"
-
-		var initializeContext = func() {
-			testRouter = apid.API().Router()
-			testServer = httptest.NewServer(testRouter)
-
-			// set up mock server
-			mockParms := MockParms{
-				ReliableAPI:  false,
-				ClusterID:    config.GetString(configApidClusterId),
-				TokenKey:     config.GetString(configConsumerKey),
-				TokenSecret:  config.GetString(configConsumerSecret),
-				Scope:        "ert452",
-				Organization: "att",
-				Environment:  "prod",
-			}
-			testMock = Mock(mockParms, testRouter)
-
-			config.Set(configProxyServerBaseURI, testServer.URL)
-			config.Set(configSnapServerBaseURI, testServer.URL)
-			config.Set(configChangeServerBaseURI, testServer.URL)
-		}
-
-		var restoreContext = func() {
-
-			testServer.Close()
-
-			config.Set(configProxyServerBaseURI, dummyConfigValue)
-			config.Set(configSnapServerBaseURI, dummyConfigValue)
-			config.Set(configChangeServerBaseURI, dummyConfigValue)
-
-		}
-
-		It("should succesfully bootstrap from clean slate", func(done Done) {
-			log.Info("Starting sync tests...")
-			var closeDone <-chan bool
-			initializeContext()
-			// do not wipe DB after.  Lets use it
-			wipeDBAferTest = false
-			var lastSnapshot *common.Snapshot
-
-			expectedSnapshotTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms_company"},
-					common.Change{Table: "edgex_apid_cluster"},
-					common.Change{Table: "edgex_data_scope"},
-					common.Change{Table: "kms_app_credential"},
-					common.Change{Table: "kms_app_credential_apiproduct_mapper"},
-					common.Change{Table: "kms_developer"},
-					common.Change{Table: "kms_company_developer"},
-					common.Change{Table: "kms_api_product"},
-					common.Change{Table: "kms_app"}},
-			}
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-				if s, ok := event.(*common.Snapshot); ok {
-
-					Expect(16).To(Equal(len(knownTables)))
-					Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
-
-					lastSnapshot = s
-
-					db, _ := dataService.DBVersion(s.SnapshotInfo)
-					var rowCount int
-					var id string
-
-					err := db.Ping()
-					Expect(err).NotTo(HaveOccurred())
-					numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;")
-					if err != nil {
-						Fail("Failed to get correct DB")
-					}
-					Expect(true).To(Equal(numApidClusters.Next()))
-					numApidClusters.Scan(&rowCount)
-					Expect(1).To(Equal(rowCount))
-					numApidClusters.Close()
-					apidClusters, err := db.Query("select id from edgex_apid_cluster;")
-					Expect(err).NotTo(HaveOccurred())
-					apidClusters.Next()
-					apidClusters.Scan(&id)
-					Expect(id).To(Equal(expectedClusterId))
-					apidClusters.Close()
-
-					numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;")
-					Expect(err).NotTo(HaveOccurred())
-					Expect(true).To(Equal(numDataScopes.Next()))
-					numDataScopes.Scan(&rowCount)
-					Expect(2).To(Equal(rowCount))
-					numDataScopes.Close()
-					dataScopes, err := db.Query("select id from edgex_data_scope;")
-					Expect(err).NotTo(HaveOccurred())
-					dataScopes.Next()
-					dataScopes.Scan(&id)
-					dataScopes.Next()
-
-					if id == expectedDataScopeId1 {
-						dataScopes.Scan(&id)
-						Expect(id).To(Equal(expectedDataScopeId2))
-					} else {
-						dataScopes.Scan(&id)
-						Expect(id).To(Equal(expectedDataScopeId1))
-					}
-					dataScopes.Close()
-
-				} else if cl, ok := event.(*common.ChangeList); ok {
-					closeDone = apidChangeManager.close()
-					// ensure that snapshot switched DB versions
-					Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
-					expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
-					Expect(err).NotTo(HaveOccurred())
-					Expect(getDB() == expectedDB).Should(BeTrue())
-
-					Expect(cl.Changes).To(HaveLen(6))
-
-					var tables []string
-					for _, c := range cl.Changes {
-						tables = append(tables, c.Table)
-						Expect(c.NewRow).ToNot(BeNil())
-
-						var tenantID string
-						c.NewRow.Get("tenant_id", &tenantID)
-						Expect(tenantID).To(Equal("ert452"))
-					}
-
-					Expect(tables).To(ContainElement("kms_app_credential"))
-					Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper"))
-					Expect(tables).To(ContainElement("kms_developer"))
-					Expect(tables).To(ContainElement("kms_company_developer"))
-					Expect(tables).To(ContainElement("kms_api_product"))
-					Expect(tables).To(ContainElement("kms_app"))
-
-					go func() {
-						// when close done, all handlers for the first changeList have been executed
-						<-closeDone
-						defer GinkgoRecover()
-						// allow other handler to execute to insert last_sequence
-						var seq string
-						err = getDB().
-							QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;").
-							Scan(&seq)
-						Expect(err).NotTo(HaveOccurred())
-						//}
-						Expect(seq).To(Equal(cl.LastSequence))
-
-						restoreContext()
-						close(done)
-					}()
-
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-		}, 5)
-
-		It("should bootstrap from local DB if present", func(done Done) {
-
-			var closeDone <-chan bool
-
-			initializeContext()
-			expectedTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms_company"},
-					common.Change{Table: "edgex_apid_cluster"},
-					common.Change{Table: "edgex_data_scope"}},
-			}
-			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
-				if s, ok := event.(*common.Snapshot); ok {
-					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
-					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
-					closeDone = apidChangeManager.close()
-					go func() {
-						// when close done, all handlers for the first snapshot have been executed
-						<-closeDone
-						//verify that the knownTables array has been properly populated from existing DB
-						Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
-
-						Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
-						Expect(s.Tables).To(BeNil())
-
-						restoreContext()
-						close(done)
-					}()
-
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-
-		}, 3)
-
-		It("should detect apid_cluster_id change in config yaml", func() {
-			Expect(apidInfo).ToNot(BeNil())
-			Expect(apidInfo.ClusterID).To(Equal("bootstrap"))
-			Expect(apidInfo.InstanceID).ToNot(BeEmpty())
-			previousInstanceId := apidInfo.InstanceID
-
-			config.Set(configApidClusterId, "new value")
-			apidInfo, err := getApidInstanceInfo()
-			Expect(err).NotTo(HaveOccurred())
-			Expect(apidInfo.LastSnapshot).To(BeEmpty())
-			Expect(apidInfo.InstanceID).ToNot(BeEmpty())
-			Expect(apidInfo.InstanceID).ToNot(Equal(previousInstanceId))
-			Expect(apidInfo.ClusterID).To(Equal("new value"))
-		})
-
-		It("should correctly identify non-proper subsets with respect to maps", func() {
-
-			//test b proper subset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "b"}},
-			)).To(BeFalse())
-
-			//test a == b
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
-			)).To(BeFalse())
-
-			//test b superset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
-			)).To(BeTrue())
-
-			//test b not subset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "c"}},
-			)).To(BeTrue())
-
-			//test a empty
-			Expect(changesHaveNewTables(map[string]bool{},
-				[]common.Change{common.Change{Table: "a"}},
-			)).To(BeTrue())
-
-			//test b empty
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{},
-			)).To(BeFalse())
-
-			//test b nil
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
-
-			//test a nil
-			Expect(changesHaveNewTables(nil,
-				[]common.Change{common.Change{Table: "a"}},
-			)).To(BeTrue())
-		}, 3)
-
-		// todo: disabled for now -
-		// there is precondition I haven't been able to track down that breaks this test on occasion
-		XIt("should process a new snapshot when change server requires it", func(done Done) {
-			oldSnap := apidInfo.LastSnapshot
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-				defer GinkgoRecover()
-
-				if s, ok := event.(*common.Snapshot); ok {
-					Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
-					close(done)
-				}
-			})
-			testMock.forceNewSnapshot()
-		})
-
-		It("Verify the Sequence Number Logic works as expected", func() {
-			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
-			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
-			Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
-			Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
-			Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
-			Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
-		}, 3)
-
-		/*
-		 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
-		 */
-		It("Should be able to handle duplicate snapshot during bootstrap", func() {
-			initializeContext()
-			apidTokenManager = createSimpleTokenManager()
-			apidTokenManager.start()
-			apidSnapshotManager = createSnapShotManager()
-			//events.Listen(ApigeeSyncEventSelector, &handler{})
-
-			scopes := []string{apidInfo.ClusterID}
-			snapshot := &common.Snapshot{}
-			apidSnapshotManager.downloadSnapshot(true, scopes, snapshot)
-			apidSnapshotManager.storeBootSnapshot(snapshot)
-			apidSnapshotManager.storeDataSnapshot(snapshot)
-			restoreContext()
-			<-apidSnapshotManager.close()
-			apidTokenManager.close()
-		}, 3)
-
-		It("Reuse http.Client connection for multiple concurrent requests", func() {
-			var tr *http.Transport
-			server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-			}))
-			tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
-			tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
-			var rspcnt int = 0
-			ch := make(chan *http.Response)
-			client := &http.Client{Transport: tr}
-			for i := 0; i < 2*maxIdleConnsPerHost; i++ {
-				go func(client *http.Client) {
-					req, err := http.NewRequest("GET", server.URL, nil)
-					resp, err := client.Do(req)
-					if err != nil {
-						Fail("Unable to process Client request")
-					}
-					ch <- resp
-					resp.Body.Close()
-
-				}(client)
-			}
-			for {
-				select {
-				case resp := <-ch:
-					Expect(resp.StatusCode).To(Equal(http.StatusOK))
-					if rspcnt >= 2*maxIdleConnsPerHost-1 {
-						return
-					}
-					rspcnt++
-				default:
-				}
-			}
-
-		}, 3)
-
-	})
-})
diff --git a/change_test.go b/change_test.go
index 511eb82..a467a18 100644
--- a/change_test.go
+++ b/change_test.go
@@ -16,32 +16,63 @@
 
 import (
 	"github.com/apid/apid-core"
+	"github.com/apid/apid-core/api"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
+	"net/http"
 	"net/http/httptest"
-	"os"
 	"time"
 )
 
+const (
+	expectedInstanceId = "dummy"
+)
+
 var _ = Describe("Change Agent", func() {
 
 	Context("Change Agent Unit Tests", func() {
+		testCount := 0
+		var testChangeMan *pollChangeManager
+		var dummyDbMan *dummyDbManager
+		var dummySnapMan *dummySnapshotManager
+		var dummyTokenMan *dummyTokenManager
+		var testServer *httptest.Server
+		var testRouter apid.Router
+		var testMock *MockServer
+		BeforeEach(func() {
+			testCount++
+			dummyDbMan = &dummyDbManager{
+				knownTables: map[string]bool{
+					"_transicator_metadata":                true,
+					"_transicator_tables":                  true,
+					"attributes":                           true,
+					"edgex_apid_cluster":                   true,
+					"edgex_data_scope":                     true,
+					"kms_api_product":                      true,
+					"kms_app":                              true,
+					"kms_app_credential":                   true,
+					"kms_app_credential_apiproduct_mapper": true,
+					"kms_company":                          true,
+					"kms_company_developer":                true,
+					"kms_deployment":                       true,
+					"kms_developer":                        true,
+					"kms_organization":                     true,
+				},
+				scopes: []string{"43aef41d"},
+			}
+			dummySnapMan = &dummySnapshotManager{
+				downloadCalledChan: make(chan bool, 1),
+			}
+			dummyTokenMan = &dummyTokenManager{
+				invalidateChan: make(chan bool, 1),
+			}
+			client := &http.Client{}
+			testChangeMan = createChangeManager(dummyDbMan, dummySnapMan, dummyTokenMan, client)
+			testChangeMan.block = 0
 
-		var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
-			initDb(sqlfile, "./mockdb_change.sqlite3")
-			file, err := os.Open("./mockdb_change.sqlite3")
-			Expect(err).Should(Succeed())
-			s := common.Snapshot{}
-			err = processSnapshotServerFileResponse(dbId, file, &s)
-			Expect(err).Should(Succeed())
-			return s
-		}
-
-		var initializeContext = func() {
-			testRouter = apid.API().Router()
+			// create a new API service to have a new router for testing
+			testRouter = api.CreateService().Router()
 			testServer = httptest.NewServer(testRouter)
-
 			// set up mock server
 			mockParms := MockParms{
 				ReliableAPI:  true,
@@ -52,94 +83,51 @@
 				Organization: "att",
 				Environment:  "prod",
 			}
+			apidInfo.ClusterID = expectedClusterId
+			apidInfo.InstanceID = expectedInstanceId
 			testMock = Mock(mockParms, testRouter)
-
 			config.Set(configProxyServerBaseURI, testServer.URL)
 			config.Set(configSnapServerBaseURI, testServer.URL)
 			config.Set(configChangeServerBaseURI, testServer.URL)
 			config.Set(configPollInterval, 1*time.Millisecond)
-		}
 
-		var restoreContext = func() {
+		})
 
+		AfterEach(func() {
 			testServer.Close()
+			<-testChangeMan.close()
 			config.Set(configProxyServerBaseURI, dummyConfigValue)
 			config.Set(configSnapServerBaseURI, dummyConfigValue)
 			config.Set(configChangeServerBaseURI, dummyConfigValue)
 			config.Set(configPollInterval, 10*time.Millisecond)
-		}
-
-		var _ = BeforeEach(func() {
-			_initPlugin(apid.AllServices())
-			createManagers()
-			event := createTestDb("./sql/init_mock_db.sql", "test_change")
-			processSnapshot(&event)
-			knownTables = extractTablesFromDB(getDB())
-		})
-
-		var _ = AfterEach(func() {
-			restoreContext()
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).Should(Succeed())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).Should(Succeed())
-				err = tx.Commit()
-				Expect(err).Should(Succeed())
-			}
-			wipeDBAferTest = true
 		})
 
 		It("test change agent with authorization failure", func() {
 			log.Debug("test change agent with authorization failure")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			apidSnapshotManager = &dummySnapshotManager{}
-			initializeContext()
 			testMock.forceAuthFail()
-			wipeDBAferTest = true
-			apidChangeManager.pollChangeWithBackoff()
+			testChangeMan.pollChangeWithBackoff()
 			// auth check fails
-			<-testTokenManager.invalidateChan
+			<-dummyTokenMan.invalidateChan
 			log.Debug("closing")
-			<-apidChangeManager.close()
 		})
 
 		It("test change agent with too old snapshot", func() {
 			log.Debug("test change agent with too old snapshot")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			testSnapshotManager := &dummySnapshotManager{make(chan bool)}
-			apidSnapshotManager = testSnapshotManager
-			initializeContext()
-
 			testMock.passAuthCheck()
 			testMock.forceNewSnapshot()
-			wipeDBAferTest = true
-			apidChangeManager.pollChangeWithBackoff()
-			<-testSnapshotManager.downloadCalledChan
+			testChangeMan.pollChangeWithBackoff()
+			<-dummySnapMan.downloadCalledChan
 			log.Debug("closing")
-			<-apidChangeManager.close()
 		})
 
 		It("change agent should retry with authorization failure", func(done Done) {
 			log.Debug("change agent should retry with authorization failure")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			apidSnapshotManager = &dummySnapshotManager{}
-			initializeContext()
 			testMock.forceAuthFail()
 			testMock.forceNoSnapshot()
-			wipeDBAferTest = true
-
 			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
 
 				if _, ok := event.(*common.ChangeList); ok {
-					closeDone := apidChangeManager.close()
+					closeDone := testChangeMan.close()
 					log.Debug("closing")
 					go func() {
 						// when close done, all handlers for the first snapshot have been executed
@@ -150,10 +138,11 @@
 				}
 			})
 
-			apidChangeManager.pollChangeWithBackoff()
+			testChangeMan.pollChangeWithBackoff()
 			// auth check fails
-			<-testTokenManager.invalidateChan
-		}, 2)
+			<-dummyTokenMan.invalidateChan
+			testMock.passAuthCheck()
+		}, 3)
 
 	})
 })
diff --git a/changes.go b/changes.go
index b78a1d1..9b32658 100644
--- a/changes.go
+++ b/changes.go
@@ -22,28 +22,37 @@
 	"net/url"
 	"path"
 	"sort"
+	"strconv"
 	"sync/atomic"
 	"time"
 )
 
-var lastSequence string
-var block string = "45"
-
 type pollChangeManager struct {
 	// 0 for not closed, 1 for closed
 	isClosed *int32
 	// 0 for pollChangeWithBackoff() not launched, 1 for launched
-	isLaunched *int32
-	quitChan   chan bool
+	isLaunched   *int32
+	quitChan     chan bool
+	block        int
+	lastSequence string
+	dbMan        DbManager
+	snapMan      snapShotManager
+	tokenMan     tokenManager
+	client       *http.Client
 }
 
-func createChangeManager() *pollChangeManager {
+func createChangeManager(dbMan DbManager, snapMan snapShotManager, tokenMan tokenManager, client *http.Client) *pollChangeManager {
 	isClosedInt := int32(0)
 	isLaunchedInt := int32(0)
 	return &pollChangeManager{
 		isClosed:   &isClosedInt,
 		quitChan:   make(chan bool),
 		isLaunched: &isLaunchedInt,
+		block:      45,
+		dbMan:      dbMan,
+		snapMan:    snapMan,
+		tokenMan:   tokenMan,
+		client:     client,
 	}
 }
 
@@ -68,8 +77,8 @@
 		log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
 		go func() {
 			c.quitChan <- true
-			apidTokenManager.close()
-			<-apidSnapshotManager.close()
+			c.tokenMan.close()
+			<-c.snapMan.close()
 			log.Debug("change manager closed")
 			finishChan <- false
 		}()
@@ -79,8 +88,8 @@
 	log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
 	go func() {
 		c.quitChan <- true
-		apidTokenManager.close()
-		<-apidSnapshotManager.close()
+		c.tokenMan.close()
+		<-c.snapMan.close()
 		log.Debug("change manager closed")
 		finishChan <- true
 	}()
@@ -120,43 +129,152 @@
 	 * Check to see if we have lastSequence already saved in the DB,
 	 * in which case, it has to be used to prevent re-reading same data
 	 */
-	lastSequence = getLastSequence()
-
+	c.lastSequence = c.dbMan.getLastSequence()
 	for {
 		select {
 		case <-c.quitChan:
 			log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
 			return quitSignalError{}
 		default:
-			err := c.getChanges(changesUri)
+			scopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
 			if err != nil {
-				if _, ok := err.(quitSignalError); ok {
-					log.Debug("pollChangeAgent: consuming the quit signal")
-					<-c.quitChan
-				}
+				return err
+			}
+			r, err := c.getChanges(scopes, changesUri)
+			if err != nil {
+				return err
+			}
+			cl, err := c.parseChangeResp(r)
+			if err != nil {
+				return err
+			}
+			if err = c.emitChangeList(scopes, cl); err != nil {
 				return err
 			}
 		}
 	}
 }
 
-//TODO refactor this method more, split it up
-/* Make a single request to the changeserver to get a changelist */
-func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
-	// if closed
-	if atomic.LoadInt32(c.isClosed) == int32(1) {
-		return quitSignalError{}
+func (c *pollChangeManager) parseChangeResp(r *http.Response) (*common.ChangeList, error) {
+	var err error
+	defer r.Body.Close()
+
+	if r.StatusCode != http.StatusOK {
+		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
+		switch r.StatusCode {
+		case http.StatusUnauthorized:
+			err = c.tokenMan.invalidateToken()
+			if err != nil {
+				return nil, err
+			}
+			return nil, authFailError{}
+
+		case http.StatusNotModified:
+			return nil, nil
+		case http.StatusBadRequest:
+			var apiErr changeServerError
+			var b []byte
+			b, err = ioutil.ReadAll(r.Body)
+			if err != nil {
+				log.Errorf("Unable to read response body: %v", err)
+				return nil, err
+			}
+			err = json.Unmarshal(b, &apiErr)
+			if err != nil {
+				log.Errorf("JSON Response Data not parsable: %s", string(b))
+				return nil, err
+			}
+			if apiErr.Code == "SNAPSHOT_TOO_OLD" {
+				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
+				err = apiErr
+			}
+			return nil, err
+		default:
+			log.Errorf("Unknown response code from change server: %v", r.Status)
+			return nil, nil
+		}
 	}
+
+	resp := &common.ChangeList{}
+	err = json.NewDecoder(r.Body).Decode(resp)
+	if err != nil {
+		log.Errorf("JSON Response Data not parsable: %v", err)
+		return nil, err
+	}
+	return resp, nil
+}
+
+func (c *pollChangeManager) emitChangeList(scopes []string, cl *common.ChangeList) error {
+	var err error
+	/*
+	 * If the lastSequence is already newer or the same than what we got via
+	 * cl.LastSequence, Ignore it.
+	 */
+	if c.lastSequence != "" &&
+		getChangeStatus(c.lastSequence, cl.LastSequence) != 1 {
+		return nil
+	}
+
+	if changesRequireDDLSync(c.dbMan.getKnowTables(), cl) {
+		return changeServerError{
+			Code: "DDL changes detected; must get new snapshot",
+		}
+	}
+
+	/* If valid data present, Emit to plugins */
+	if len(cl.Changes) > 0 {
+		if err = c.dbMan.processChangeList(cl); err != nil {
+			log.Errorf("Error in processChangeList: %v", err)
+			return err
+		}
+		select {
+		case <-time.After(httpTimeout):
+			log.Panic("Timeout. Plugins failed to respond to changes.")
+		case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+		}
+	} else if c.lastSequence == "" {
+		select {
+		case <-time.After(httpTimeout):
+			log.Panic("Timeout. Plugins failed to respond to changes.")
+		case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+		}
+	} else {
+		log.Debugf("No Changes detected")
+	}
+
+	err = c.dbMan.updateLastSequence(cl.LastSequence)
+	if err != nil {
+		log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
+	}
+	c.lastSequence = cl.LastSequence
+
+	/*
+	 * Check to see if there was any change in scope. If found, handle it
+	 * by getting a new snapshot
+	 */
+	newScopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
+	if err != nil {
+		return err
+	}
+	cs := scopeChanged(newScopes, scopes)
+	if cs != nil {
+		return cs
+	}
+
+	return nil
+}
+
+/* Make a single request to the changeserver to get a changelist */
+func (c *pollChangeManager) getChanges(scopes []string, changesUri *url.URL) (*http.Response, error) {
 	log.Debug("polling...")
 
 	/* Find the scopes associated with the config id */
-	scopes := findScopesForId(apidInfo.ClusterID)
 	v := url.Values{}
 
-	blockValue := block
+	blockValue := strconv.Itoa(c.block)
 	/* Sequence added to the query if available */
-	if lastSequence != "" {
-		v.Add("since", lastSequence)
+	if c.lastSequence != "" {
+		v.Add("since", c.lastSequence)
 	} else {
 		blockValue = "0"
 	}
@@ -178,115 +296,16 @@
 
 	/* If error, break the loop, and retry after interval */
 	req, err := http.NewRequest("GET", uri, nil)
-	addHeaders(req)
-	r, err := httpclient.Do(req)
+	addHeaders(req, c.tokenMan.getBearerToken())
+	r, err := c.client.Do(req)
 	if err != nil {
 		log.Errorf("change agent comm error: %s", err)
-		// if closed
-		if atomic.LoadInt32(c.isClosed) == int32(1) {
-			return quitSignalError{}
-		}
-		return err
+		return nil, err
 	}
-	defer r.Body.Close()
-
-	// has been closed
-	if atomic.LoadInt32(c.isClosed) == int32(1) {
-		log.Debugf("getChanges: changeManager has been closed")
-		return quitSignalError{}
-	}
-
-	if r.StatusCode != http.StatusOK {
-		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
-		switch r.StatusCode {
-		case http.StatusUnauthorized:
-			err = apidTokenManager.invalidateToken()
-			if err != nil {
-				return err
-			}
-			return authFailError{}
-
-		case http.StatusNotModified:
-			return nil
-
-		case http.StatusBadRequest:
-			var apiErr changeServerError
-			var b []byte
-			b, err = ioutil.ReadAll(r.Body)
-			if err != nil {
-				log.Errorf("Unable to read response body: %v", err)
-				return err
-			}
-			err = json.Unmarshal(b, &apiErr)
-			if err != nil {
-				log.Errorf("JSON Response Data not parsable: %s", string(b))
-				return err
-			}
-			if apiErr.Code == "SNAPSHOT_TOO_OLD" {
-				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
-				err = apiErr
-			}
-			return err
-		}
-		return nil
-	}
-
-	var resp common.ChangeList
-	err = json.NewDecoder(r.Body).Decode(&resp)
-	if err != nil {
-		log.Errorf("JSON Response Data not parsable: %v", err)
-		return err
-	}
-
-	/*
-	 * If the lastSequence is already newer or the same than what we got via
-	 * resp.LastSequence, Ignore it.
-	 */
-	if lastSequence != "" &&
-		getChangeStatus(lastSequence, resp.LastSequence) != 1 {
-		return nil
-	}
-
-	if changesRequireDDLSync(resp) {
-		return changeServerError{
-			Code: "DDL changes detected; must get new snapshot",
-		}
-	}
-
-	/* If valid data present, Emit to plugins */
-	if len(resp.Changes) > 0 {
-		processChangeList(&resp)
-		select {
-		case <-time.After(httpTimeout):
-			log.Panic("Timeout. Plugins failed to respond to changes.")
-		case <-events.Emit(ApigeeSyncEventSelector, &resp):
-		}
-	} else if lastSequence == "" {
-		select {
-		case <-time.After(httpTimeout):
-			log.Panic("Timeout. Plugins failed to respond to changes.")
-		case <-events.Emit(ApigeeSyncEventSelector, &resp):
-		}
-	} else {
-		log.Debugf("No Changes detected for Scopes: %s", scopes)
-	}
-
-	updateSequence(resp.LastSequence)
-
-	/*
-	 * Check to see if there was any change in scope. If found, handle it
-	 * by getting a new snapshot
-	 */
-	newScopes := findScopesForId(apidInfo.ClusterID)
-	cs := scopeChanged(newScopes, scopes)
-	if cs != nil {
-		return cs
-	}
-
-	return nil
+	return r, nil
 }
 
-func changesRequireDDLSync(changes common.ChangeList) bool {
+func changesRequireDDLSync(knownTables map[string]bool, changes *common.ChangeList) bool {
 	return changesHaveNewTables(knownTables, changes.Changes)
 }
 
@@ -296,10 +315,12 @@
 		log.Debugf("handleChangeServerError: changeManager has been closed")
 		return
 	}
-	if c, ok := err.(changeServerError); ok {
-		log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
-		apidSnapshotManager.downloadDataSnapshot()
-	} else {
+
+	switch e := err.(type) {
+	case changeServerError:
+		log.Debugf("%s. Fetch a new snapshot to sync...", e.Code)
+		c.snapMan.downloadDataSnapshot()
+	default:
 		log.Debugf("Error connecting to changeserver: %v", err)
 	}
 }
@@ -310,7 +331,7 @@
 func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
 
 	//nil maps should not be passed in.  Making the distinction between nil map and empty map
-	if a == nil {
+	if len(a) == 0 {
 		log.Warn("Nil map passed to function changesHaveNewTables, may be bug")
 		return true
 	}
@@ -332,24 +353,15 @@
 func getChangeStatus(lastSeq string, currSeq string) int {
 	seqPrev, err := common.ParseSequence(lastSeq)
 	if err != nil {
-		log.Panic("Unable to parse previous sequence string")
+		log.Panicf("Unable to parse previous sequence string: %v", err)
 	}
 	seqCurr, err := common.ParseSequence(currSeq)
 	if err != nil {
-		log.Panic("Unable to parse current sequence string")
+		log.Panicf("Unable to parse current sequence string: %v", err)
 	}
 	return seqCurr.Compare(seqPrev)
 }
 
-func updateSequence(seq string) {
-	lastSequence = seq
-	err := updateLastSequence(seq)
-	if err != nil {
-		log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
-	}
-
-}
-
 /*
  * Returns nil if the two arrays have matching contents
  */
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go
index 486da41..8cfa1ef 100644
--- a/cmd/mockServer/main.go
+++ b/cmd/mockServer/main.go
@@ -19,7 +19,6 @@
 
 	"os"
 
-
 	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/factory"
 	"github.com/apid/apidApigeeSync"
@@ -53,16 +52,16 @@
 	router := apid.API().Router()
 
 	params := apidApigeeSync.MockParms{
-		ReliableAPI:            *reliable,
-		ClusterID:              "cluster",
-		TokenKey:               "key",
-		TokenSecret:            "secret",
-		Scope:                  "scope",
-		Organization:           "org",
-		Environment:            "test",
-		NumDevelopers:          *numDevs,
-		NumDeployments:         *numDeps,
-		BundleURI:              *bundleURI,
+		ReliableAPI:    *reliable,
+		ClusterID:      "cluster",
+		TokenKey:       "key",
+		TokenSecret:    "secret",
+		Scope:          "scope",
+		Organization:   "org",
+		Environment:    "test",
+		NumDevelopers:  *numDevs,
+		NumDeployments: *numDeps,
+		BundleURI:      *bundleURI,
 	}
 
 	log.Printf("Params: %#v\n", params)
diff --git a/data.go b/data.go
index c77fc10..0205bb7 100644
--- a/data.go
+++ b/data.go
@@ -28,27 +28,36 @@
 )
 
 var (
-	unsafeDB apid.DB
-	dbMux    sync.RWMutex
+	dbMux sync.RWMutex
 )
 
-type dataApidCluster struct {
-	ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string
-	Updated, Created                                        string
-}
-
-type dataDataScope struct {
-	ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string
-	Updated, Created                                     string
-}
-
 /*
 This plugin uses 2 databases:
 1. The default DB is used for APID table.
 2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE
 (Currently, the snapshot never changes, but this is future-proof)
 */
-func initDB(db apid.DB) error {
+
+func creatDbManager() *dbManager {
+	return &dbManager{
+		DbMux:       &sync.RWMutex{},
+		knownTables: make(map[string]bool),
+	}
+}
+
+type dbManager struct {
+	Db          apid.DB
+	DbMux       *sync.RWMutex
+	dbVersion   string
+	knownTables map[string]bool
+}
+
+// idempotent call to initialize default DB
+func (dbMan *dbManager) initDB() error {
+	db, err := dataService.DB()
+	if err != nil {
+		return err
+	}
 	tx, err := db.Begin()
 	if err != nil {
 		log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
@@ -75,24 +84,22 @@
 	return nil
 }
 
-func getDB() apid.DB {
+func (dbMan *dbManager) getDB() apid.DB {
 	dbMux.RLock()
-	db := unsafeDB
-	dbMux.RUnlock()
-	return db
+	defer dbMux.RUnlock()
+	return dbMan.Db
 }
 
-func setDB(db apid.DB) {
+func (dbMan *dbManager) setDB(db apid.DB) {
 	dbMux.Lock()
-	unsafeDB = db
-	dbMux.Unlock()
+	defer dbMux.Unlock()
+	dbMan.Db = db
 }
 
 //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
-
+func (dbMan *dbManager) insert(tableName string, rows []common.Row, txn apid.Tx) error {
 	if len(rows) == 0 {
-		return false
+		return fmt.Errorf("no rows")
 	}
 
 	var orderedColumns []string
@@ -101,12 +108,12 @@
 	}
 	sort.Strings(orderedColumns)
 
-	sql := buildInsertSql(tableName, orderedColumns, rows)
+	sql := dbMan.buildInsertSql(tableName, orderedColumns, rows)
 
 	prep, err := txn.Prepare(sql)
 	if err != nil {
 		log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
-		return false
+		return err
 	}
 	defer prep.Close()
 
@@ -124,14 +131,14 @@
 
 	if err != nil {
 		log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
-		return false
+		return err
 	}
 	log.Debugf("INSERT Success [%s] values=%v", sql, values)
 
-	return true
+	return nil
 }
 
-func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
+func (dbMan *dbManager) getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
 	var values = make([]interface{}, len(pkeys))
 	for i, pkey := range pkeys {
 		if row[pkey] == nil {
@@ -143,52 +150,46 @@
 	return values
 }
 
-func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
-	pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) delete(tableName string, rows []common.Row, txn apid.Tx) error {
+	pkeys, err := dbMan.getPkeysForTable(tableName)
 	sort.Strings(pkeys)
 	if len(pkeys) == 0 || err != nil {
-		log.Errorf("DELETE No primary keys found for table. %s", tableName)
-		return false
+		return fmt.Errorf("DELETE No primary keys found for table. %s", tableName)
 	}
 
 	if len(rows) == 0 {
-		log.Errorf("No rows found for table.", tableName)
-		return false
+		return fmt.Errorf("No rows found for table.", tableName)
 	}
 
-	sql := buildDeleteSql(tableName, rows[0], pkeys)
+	sql := dbMan.buildDeleteSql(tableName, rows[0], pkeys)
 	prep, err := txn.Prepare(sql)
 	if err != nil {
-		log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
-		return false
+		return fmt.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
 	}
 	defer prep.Close()
 	for _, row := range rows {
-		values := getValueListFromKeys(row, pkeys)
+		values := dbMan.getValueListFromKeys(row, pkeys)
 		// delete prepared statement from existing template statement
 		res, err := txn.Stmt(prep).Exec(values...)
 		if err != nil {
-			log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
 		}
 		affected, err := res.RowsAffected()
 		if err == nil && affected != 0 {
 			log.Debugf("DELETE Success [%s] values=%v", sql, values)
 		} else if err == nil && affected == 0 {
-			log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
-			return false
+			return fmt.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
 		} else {
-			log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
 		}
 
 	}
-	return true
+	return nil
 
 }
 
 // Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
-func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
 
 	var wherePlaceholders []string
 	i := 1
@@ -210,14 +211,13 @@
 
 }
 
-func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
-	pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) error {
+	pkeys, err := dbMan.getPkeysForTable(tableName)
 	if len(pkeys) == 0 || err != nil {
-		log.Errorf("UPDATE No primary keys found for table.", tableName)
-		return false
+		return fmt.Errorf("UPDATE No primary keys found for table: %v, %v", tableName, err)
 	}
 	if len(oldRows) == 0 || len(newRows) == 0 {
-		return false
+		return fmt.Errorf("UPDATE No old or new rows, table: %v, %v, %v", tableName, oldRows, newRows)
 	}
 
 	var orderedColumns []string
@@ -229,11 +229,10 @@
 	sort.Strings(orderedColumns)
 
 	//build update statement, use arbitrary row as template
-	sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
+	sql := dbMan.buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
 	prep, err := txn.Prepare(sql)
 	if err != nil {
-		log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
-		return false
+		return fmt.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
 	}
 	defer prep.Close()
 
@@ -265,13 +264,11 @@
 		res, err := txn.Stmt(prep).Exec(values...)
 
 		if err != nil {
-			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
 		}
 		numRowsAffected, err := res.RowsAffected()
 		if err != nil {
-			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
 		}
 		//delete this once we figure out why tests are failing/not updating
 		log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
@@ -279,11 +276,11 @@
 
 	}
 
-	return true
+	return nil
 
 }
 
-func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
 	if row == nil {
 		return ""
 	}
@@ -311,7 +308,7 @@
 }
 
 //precondition: rows.length > 1000, max number of entities for sqlite
-func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
+func (dbMan *dbManager) buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
 	if len(rows) == 0 {
 		return ""
 	}
@@ -343,8 +340,8 @@
 	return sql
 }
 
-func getPkeysForTable(tableName string) ([]string, error) {
-	db := getDB()
+func (dbMan *dbManager) getPkeysForTable(tableName string) ([]string, error) {
+	db := dbMan.getDB()
 	normalizedTableName := normalizeTableName(tableName)
 	sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
 	rows, err := db.Query(sql, normalizedTableName)
@@ -358,13 +355,15 @@
 		var value string
 		err := rows.Scan(&value)
 		if err != nil {
-			log.Fatal(err)
+			log.Errorf("failed to scan column names: %v", err)
+			return nil, err
 		}
 		columnNames = append(columnNames, value)
 	}
 	err = rows.Err()
 	if err != nil {
-		log.Fatal(err)
+		log.Errorf("failed to scan column names: %v", err)
+		return nil, err
 	}
 	return columnNames, nil
 }
@@ -377,13 +376,11 @@
  * For the given apidConfigId, this function will retrieve all the distinch scopes
  * associated with it. Distinct, because scope is already a collection of the tenants.
  */
-func findScopesForId(configId string) (scopes []string) {
+func (dbMan *dbManager) findScopesForId(configId string) (scopes []string, err error) {
 
 	log.Debugf("findScopesForId: %s", configId)
-
 	var scope sql.NullString
-	db := getDB()
-
+	db := dbMan.getDB()
 	query := `
 		SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1
 		UNION
@@ -391,7 +388,6 @@
 		UNION
 		SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3
 	`
-
 	rows, err := db.Query(query, configId, configId, configId)
 	if err != nil {
 		log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
@@ -416,9 +412,9 @@
 /*
  * Retrieve SnapshotInfo for the given apidConfigId from apid_config table
  */
-func getLastSequence() (lastSequence string) {
+func (dbMan *dbManager) getLastSequence() (lastSequence string) {
 
-	err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
+	err := dbMan.getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
 	if err != nil && err != sql.ErrNoRows {
 		log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
 		return
@@ -432,11 +428,11 @@
  * Persist the last change Id each time a change has been successfully
  * processed by the plugin(s)
  */
-func updateLastSequence(lastSequence string) error {
+func (dbMan *dbManager) updateLastSequence(lastSequence string) error {
 
 	log.Debugf("updateLastSequence: %s", lastSequence)
 
-	tx, err := getDB().Begin()
+	tx, err := dbMan.getDB().Begin()
 	if err != nil {
 		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
@@ -454,10 +450,9 @@
 	return err
 }
 
-func getApidInstanceInfo() (info apidInstanceInfo, err error) {
+func (dbMan *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
 	info.InstanceName = config.GetString(configName)
 	info.ClusterID = config.GetString(configApidClusterId)
-
 	var savedClusterId string
 
 	// always use default database for this
@@ -481,7 +476,7 @@
 		} else {
 			// first start - no row, generate a UUID and store it
 			err = nil
-			newInstanceID = true
+			info.IsNewInstance = true
 			info.InstanceID = util.GenerateUUID()
 
 			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
@@ -489,13 +484,13 @@
 				info.InstanceID, info.ClusterID, "")
 		}
 	} else if savedClusterId != info.ClusterID {
-		log.Debug("Detected apid cluster id change in config.  Apid will start clean")
+		log.Warn("Detected apid cluster id change in config.  Apid will start clean")
 		err = nil
-		newInstanceID = true
+		info.IsNewInstance = true
 		info.InstanceID = util.GenerateUUID()
 
-		_, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
-			info.InstanceID, info.ClusterID, "")
+		_, err = tx.Exec("DELETE FROM APID;")
+
 		info.LastSnapshot = ""
 	}
 	if err = tx.Commit(); err != nil {
@@ -504,7 +499,7 @@
 	return
 }
 
-func updateApidInstanceInfo() error {
+func (dbMan *dbManager) updateApidInstanceInfo(instanceId, clusterId, lastSnap string) error {
 
 	// always use default database for this
 	db, err := dataService.DB()
@@ -521,7 +516,7 @@
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
 		VALUES (?,?,?)`,
-		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+		instanceId, clusterId, lastSnap)
 	if err != nil {
 		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
 		return err
@@ -536,3 +531,130 @@
 
 	return err
 }
+
+func (dbMan *dbManager) extractTables() (map[string]bool, error) {
+	tables := make(map[string]bool)
+	db := dbMan.getDB()
+	rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var table sql.NullString
+		if err := rows.Scan(&table); err != nil {
+			return nil, err
+		}
+		log.Debugf("Table %v found in existing db", table)
+		if table.Valid {
+			tables[table.String] = true
+		}
+	}
+	log.Debugf("Extracting table names from existing DB %v", tables)
+	return tables, nil
+}
+
+func (dbMan *dbManager) getKnowTables() map[string]bool {
+	return dbMan.knownTables
+}
+
+func (dbMan *dbManager) processChangeList(changes *common.ChangeList) error {
+
+	tx, err := dbMan.getDB().Begin()
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
+
+	for _, change := range changes.Changes {
+		if change.Table == LISTENER_TABLE_APID_CLUSTER {
+			return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+		}
+		switch change.Operation {
+		case common.Insert:
+			err = dbMan.insert(change.Table, []common.Row{change.NewRow}, tx)
+		case common.Update:
+			if change.Table == LISTENER_TABLE_DATA_SCOPE {
+				return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+			}
+			err = dbMan.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+		case common.Delete:
+			err = dbMan.delete(change.Table, []common.Row{change.OldRow}, tx)
+		}
+		if err != nil {
+			return err
+		}
+
+	}
+
+	if err = tx.Commit(); err != nil {
+		return fmt.Errorf("Commit error in processChangeList: %v", err)
+	}
+	return nil
+}
+
+func (dbMan *dbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+
+	var prevDb string
+	if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
+		log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
+			apidInfo.LastSnapshot, snapshot.SnapshotInfo)
+		prevDb = apidInfo.LastSnapshot
+	} else {
+		log.Debugf("Process snapshot for version {%s}",
+			snapshot.SnapshotInfo)
+	}
+	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+	if err != nil {
+		return fmt.Errorf("Unable to access database: %v", err)
+	}
+
+	var numApidClusters int
+	tx, err := db.Begin()
+	if err != nil {
+		return fmt.Errorf("Unable to open DB txn: {%v}", err.Error())
+	}
+	defer tx.Rollback()
+	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
+	if err != nil {
+		return fmt.Errorf("Unable to read database: {%s}", err.Error())
+	}
+
+	if numApidClusters != 1 {
+		return fmt.Errorf("Illegal state for apid_cluster. Must be a single row.")
+	}
+
+	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+	if err != nil && err.Error() != "duplicate column name: last_sequence" {
+		return fmt.Errorf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
+	}
+
+	if err = tx.Commit(); err != nil {
+		return fmt.Errorf("Error when commit in processSqliteSnapshot: %v", err)
+	}
+
+	//update apid instance info
+	apidInfo.LastSnapshot = snapshot.SnapshotInfo
+	err = dbMan.updateApidInstanceInfo(apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+	if err != nil {
+		return fmt.Errorf("Unable to update instance info: %v", err)
+	}
+
+	dbMan.setDB(db)
+	if isDataSnapshot {
+		dbMan.knownTables, err = dbMan.extractTables()
+		if err != nil {
+			return fmt.Errorf("Unable to extract tables: %v", err)
+		}
+	}
+	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+
+	// Releases the DB, when the Connection reference count reaches 0.
+	if prevDb != "" {
+		dataService.ReleaseDB(prevDb)
+	}
+	return nil
+}
diff --git a/data_test.go b/data_test.go
index 691e887..e952168 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,32 +15,50 @@
 package apidApigeeSync
 
 import (
+	"database/sql"
 	"github.com/apid/apid-core"
-	"github.com/apid/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"io/ioutil"
 	"sort"
 	"strconv"
 )
 
 var _ = Describe("data access tests", func() {
-	testCount := 1
-
-	var _ = BeforeEach(func() {
+	testCount := 0
+	var testDbMan *dbManager
+	var dbVersion string
+	BeforeEach(func() {
+		testDbMan = creatDbManager()
 		testCount++
-		db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount))
+		dbVersion = "data_test_" + strconv.Itoa(testCount)
+		db, err := dataService.DBVersion(dbVersion)
 		Expect(err).Should(Succeed())
-		initDB(db)
-		createBootstrapTables(db)
-		setDB(db)
+		testDbMan.setDB(db)
 	})
 
-	var _ = AfterEach(func() {
-		data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount)))
+	AfterEach(func() {
+		dataService.ReleaseDB(dbVersion)
 	})
 
-	Context("Update processing", func() {
+	It("check scope changes", func() {
+		newScopes := []string{"foo"}
+		scopes := []string{"bar"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo", "bar"}
+		scopes = []string{"bar"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo"}
+		scopes = []string{"bar", "foo"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo", "bar"}
+		scopes = []string{"bar", "foo"}
+		Expect(scopeChanged(newScopes, scopes)).To(BeNil())
+
+	})
+
+	Context("build Sql", func() {
 		It("unit test buildUpdateSql with single primary key", func() {
 			testRow := common.Row{
 				"id": {
@@ -66,7 +84,7 @@
 			}
 			sort.Strings(orderedColumns)
 
-			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
+			result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
 			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" +
 				" WHERE id=$6").To(Equal(result))
 		})
@@ -99,403 +117,11 @@
 			}
 			sort.Strings(orderedColumns)
 
-			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
+			result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
 			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" +
 				" WHERE id1=$7 AND id2=$8").To(Equal(result))
 		})
 
-		It("test update with composite primary key", func() {
-			event := &common.ChangeList{}
-
-			//this needs to match what is actually in the DB
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-		})
-
-		It("update should succeed if newrow modifies the primary key", func() {
-			event := &common.ChangeList{}
-
-			//this needs to match what is actually in the DB
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "new_id",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-
-		It("update should succeed if newrow contains fewer fields than oldrow", func() {
-			event := &common.ChangeList{}
-
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-
-		It("update should succeed if oldrow contains fewer fields than newrow", func() {
-			event := &common.ChangeList{}
-
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-	})
-
-	Context("Insert processing", func() {
 		It("Properly constructs insert sql for one row", func() {
 			newRow := common.Row{
 				"id": {
@@ -531,7 +157,7 @@
 			sort.Strings(orderedColumns)
 
 			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)"
-			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
+			Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
 		})
 
 		It("Properly constructs insert sql for multiple rows", func() {
@@ -595,275 +221,11 @@
 			sort.Strings(orderedColumns)
 
 			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)"
-			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
+			Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
 		})
 
-		It("Properly executes insert for a single rows", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-		})
-
-		It("Properly executed insert for multiple rows", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-			newRow2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow2,
-					Operation: 1,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(2))
-		})
-
-		It("Fails to execute if row does not match existing table schema", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"not_and_id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-			}
-
-			ok := processChangeList(event)
-			Expect(false).To(Equal(ok))
-
-			var nRows int
-			//assert that no extraneous rows were added
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(0))
-		})
-
-		It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
-			event := &common.ChangeList{}
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			newRow2 := common.Row{
-				"not_and_id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow2,
-					Operation: 1,
-				},
-			}
-
-			ok := processChangeList(event)
-			Expect(false).To(Equal(ok))
-		})
-	})
-
-	Context("Delete processing", func() {
 		It("Properly constructs sql prepare for Delete", func() {
+			createBootstrapTables(testDbMan.getDB())
 			row := common.Row{
 				"id": {
 					Value: "new_id",
@@ -891,324 +253,1233 @@
 				},
 			}
 
-			pkeys, err := getPkeysForTable("kms_api_product")
+			pkeys, err := testDbMan.getPkeysForTable("kms_api_product")
 			Expect(err).Should(Succeed())
-			sql := buildDeleteSql("kms_api_product", row, pkeys)
+			sql := testDbMan.buildDeleteSql("kms_api_product", row, pkeys)
 			Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
 		})
+	})
 
-		It("Verify execute single insert & single delete works", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
-
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			Expect(true).To(Equal(processChangeList(event2)))
-
-			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(0))
-
-			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
+	Context("Process Changelist", func() {
+		BeforeEach(func() {
+			createBootstrapTables(testDbMan.getDB())
 		})
 
-		It("verify multiple insert and single delete works", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
+		Context("Update processing", func() {
+			It("test update with composite primary key", func() {
+				event := &common.ChangeList{}
 
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//this needs to match what is actually in the DB
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			Row2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row2,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-			}
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			//verify first row
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
 
-			//verify second row
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(2))
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Expect(true).To(Equal(processChangeList(event2)))
+			})
 
-			//verify second row still exists
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+			It("update should succeed if newrow modifies the primary key", func() {
+				event := &common.ChangeList{}
 
-			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//this needs to match what is actually in the DB
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
-		}, 3)
+				newRow := common.Row{
+					"id": {
+						Value: "new_id",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-		It("verify single insert and multiple delete fails", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
 
-			Row2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row2,
-					Operation: 3,
-				},
-			}
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
 
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			//verify insert
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+			It("update should succeed if newrow contains fewer fields than oldrow", func() {
+				event := &common.ChangeList{}
 
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			Expect(false).To(Equal(processChangeList(event2)))
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-		}, 3)
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
+
+			It("update should succeed if oldrow contains fewer fields than newrow", func() {
+				event := &common.ChangeList{}
+
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
+
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
+		})
+
+		Context("Insert processing", func() {
+			It("Properly executes insert for a single rows", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+			})
+
+			It("Properly executed insert for multiple rows", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+				newRow2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow2,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(2))
+			})
+
+			It("Fails to execute if row does not match existing table schema", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"not_and_id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+
+				var nRows int
+				//assert that no extraneous rows were added
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(0))
+			})
+
+			It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
+				event := &common.ChangeList{}
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				newRow2 := common.Row{
+					"not_and_id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow2,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+			})
+		})
+
+		Context("Delete processing", func() {
+
+			It("Verify execute single insert & single delete works", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+				// validate delete
+				err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(0))
+
+				// delete again should fail - coz entry will not exist
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+			})
+
+			It("verify multiple insert and single delete works", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				Row2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row2,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				//verify first row
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//verify second row
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(2))
+
+				Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+				//verify second row still exists
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				// validate delete
+				err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				// delete again should fail - coz entry will not exist
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+			}, 3)
+
+			It("verify single insert and multiple delete fails", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				Row2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row2,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				//verify insert
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+
+			}, 3)
+		})
+
+		Context("ApigeeSync change event", func() {
+
+			Context(LISTENER_TABLE_APID_CLUSTER, func() {
+
+				It("should not change LISTENER_TABLE_APID_CLUSTER", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_APID_CLUSTER,
+							},
+						},
+					}
+					Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+
+					event = common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Update,
+								Table:     LISTENER_TABLE_APID_CLUSTER,
+							},
+						},
+					}
+
+					Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+				})
+
+			})
+
+			Context("data scopes", func() {
+
+				It("insert event should add", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "i"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s1"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "j"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s2"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					count := 0
+					id := sql.NullString{}
+					rows, err := testDbMan.getDB().Query(`
+				SELECT scope FROM EDGEX_DATA_SCOPE`)
+					Expect(err).NotTo(HaveOccurred())
+					defer rows.Close()
+					for rows.Next() {
+						count++
+						Expect(rows.Scan(&id)).Should(Succeed())
+						Expect(id.String).Should(Equal("s" + strconv.Itoa(count)))
+					}
+
+					Expect(count).To(Equal(2))
+				})
+
+				It("delete event should delete", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "i"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					event = common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Delete,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								OldRow:    event.Changes[0].NewRow,
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					var nRows int
+					err := testDbMan.getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
+					Expect(err).NotTo(HaveOccurred())
+					Expect(nRows).To(BeZero())
+				})
+
+				It("update event should panic for data scopes table", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Update,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+							},
+						},
+					}
+
+					Expect(testDbMan.processChangeList(&event)).ToNot(Succeed())
+				})
+
+			})
+		})
+
 	})
+
+	Context("Process Snapshot", func() {
+		initTestDb := func(sqlFile string, dbMan *dbManager) common.Snapshot {
+			stmts, err := ioutil.ReadFile(sqlFile)
+			Expect(err).Should(Succeed())
+			Expect(testDbMan.getDB().Exec(string(stmts))).ShouldNot(BeNil())
+			Expect(testDbMan.initDB()).Should(Succeed())
+			return common.Snapshot{
+				SnapshotInfo: dbVersion,
+			}
+		}
+
+		AfterEach(func() {
+			dataService.ReleaseCommonDB()
+		})
+
+		It("should fail if more than one apid_cluster rows", func() {
+			event := initTestDb("./sql/init_listener_test_duplicate_apids.sql", testDbMan)
+			Expect(testDbMan.processSnapshot(&event, true)).ToNot(Succeed())
+		})
+
+		It("should process a valid Snapshot", func() {
+			config.Set(configApidClusterId, "a")
+			apidInfo.ClusterID = "a"
+			event := initTestDb("./sql/init_listener_test_valid_snapshot.sql", testDbMan)
+			Expect(testDbMan.processSnapshot(&event, true)).Should(Succeed())
+
+			info, err := testDbMan.getApidInstanceInfo()
+			Expect(err).Should(Succeed())
+			Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
+			Expect(info.IsNewInstance).To(BeFalse())
+			Expect(dataService.DBVersion(event.SnapshotInfo)).Should(Equal(testDbMan.getDB()))
+
+			// apid Cluster
+			id := &sql.NullString{}
+			Expect(testDbMan.getDB().QueryRow(`SELECT id FROM EDGEX_APID_CLUSTER`).
+				Scan(id)).Should(Succeed())
+			Expect(id.Valid).Should(BeTrue())
+			Expect(id.String).To(Equal("i"))
+
+			// Data Scope
+			env := &sql.NullString{}
+			count := 0
+			rows, err := testDbMan.getDB().Query(`SELECT env FROM EDGEX_DATA_SCOPE`)
+			Expect(err).Should(Succeed())
+			defer rows.Close()
+			for rows.Next() {
+				count++
+				rows.Scan(&env)
+				Expect(env.Valid).Should(BeTrue())
+				Expect(env.String).To(Equal("e" + strconv.Itoa(count)))
+			}
+			Expect(count).To(Equal(3))
+
+			//find scopes for Id
+			scopes, err := testDbMan.findScopesForId("a")
+			Expect(err).Should(Succeed())
+			Expect(len(scopes)).To(Equal(6))
+			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
+			sort.Strings(scopes)
+			sort.Strings(expectedScopes)
+			Expect(scopes).Should(Equal(expectedScopes))
+		})
+
+		It("should detect clusterid change", func() {
+			Expect(testDbMan.initDB()).Should(Succeed())
+			testDbMan.updateApidInstanceInfo("a", "b", "c")
+			config.Set(configApidClusterId, "d")
+
+			info, err := testDbMan.getApidInstanceInfo()
+			Expect(err).Should(Succeed())
+			Expect(info.LastSnapshot).To(BeZero())
+			Expect(info.IsNewInstance).To(BeTrue())
+		})
+	})
+
 })
 
 func createBootstrapTables(db apid.DB) {
 	tx, err := db.Begin()
 	Expect(err).To(Succeed())
 	//all tests in this file operate on the api_product table.  Create the necessary tables for this here
-	tx.Exec("CREATE TABLE _transicator_tables " +
-		"(tableName varchar not null, columnName varchar not null, " +
-		"typid integer, primaryKey bool);")
-	tx.Exec("DELETE from _transicator_tables")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
-	tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
-		"api_resources text,approval_type text,scopes text,proxies text, environments text," +
-		"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
-		"primary key (id,tenant_id,created_at,updated_at));")
-	tx.Exec("DELETE from kms_api_product")
-	err = tx.Commit()
+	_, err = tx.Exec(`CREATE TABLE _transicator_tables
+	(tableName varchar not null, columnName varchar not null, typid integer, primaryKey bool);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0);
+	CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at));
+	`)
 	Expect(err).To(Succeed())
+	_, err = tx.Exec(`
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org_scope',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env_scope',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+	CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,org_scope text,
+	env_scope text,created blob,created_by text,updated blob,updated_by text,_change_selector text,
+	primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+	`)
+	Expect(err).To(Succeed())
+
+	Expect(tx.Commit()).To(Succeed())
 }
diff --git a/dockertests/docker_test.go b/dockertests/docker_test.go
index e8fc25d..cefd28b 100644
--- a/dockertests/docker_test.go
+++ b/dockertests/docker_test.go
@@ -335,7 +335,7 @@
 type newTableHandler struct {
 	targetTablename string
 	done            Done
-	verifyFunc      func (string, apid.DB)
+	verifyFunc      func(string, apid.DB)
 }
 
 func (n *newTableHandler) Handle(event apid.Event) {
diff --git a/init.go b/init.go
index b7f470e..dd310c0 100644
--- a/init.go
+++ b/init.go
@@ -15,7 +15,6 @@
 package apidApigeeSync
 
 import (
-	"encoding/json"
 	"fmt"
 	"net/http"
 	"os"
@@ -48,17 +47,13 @@
 
 var (
 	/* All set during plugin initialization */
-	log                 apid.LogService
-	config              apid.ConfigService
-	dataService         apid.DataService
-	events              apid.EventsService
-	apidInfo            apidInstanceInfo
-	newInstanceID       bool
-	apidTokenManager    tokenManager
-	apidChangeManager   changeManager
-	apidSnapshotManager snapShotManager
-	httpclient          *http.Client
-	isOfflineMode       bool
+	log           apid.LogService
+	config        apid.ConfigService
+	dataService   apid.DataService
+	eventService  apid.EventsService
+	apiService    apid.APIService
+	apidInfo      apidInstanceInfo
+	isOfflineMode bool
 
 	/* Set during post plugin initialization
 	 * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -68,6 +63,7 @@
 
 type apidInstanceInfo struct {
 	InstanceID, InstanceName, ClusterID, LastSnapshot string
+	IsNewInstance                                     bool
 }
 
 type pluginDetail struct {
@@ -93,58 +89,6 @@
 	log.Debugf("Using %s as display name", config.GetString(configName))
 }
 
-func initVariables() error {
-
-	var tr *http.Transport
-
-	tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
-	tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
-	httpclient = &http.Client{
-		Transport: tr,
-		Timeout:   httpTimeout,
-		CheckRedirect: func(req *http.Request, _ []*http.Request) error {
-			req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
-			return nil
-		},
-	}
-
-	// set up default database
-	db, err := dataService.DB()
-	if err != nil {
-		return fmt.Errorf("Unable to access DB: %v", err)
-	}
-	err = initDB(db)
-	if err != nil {
-		return fmt.Errorf("Unable to access DB: %v", err)
-	}
-	setDB(db)
-
-	apidInfo, err = getApidInstanceInfo()
-	if err != nil {
-		return fmt.Errorf("Unable to get apid instance info: %v", err)
-	}
-
-	if config.IsSet(configApidInstanceID) {
-		log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
-	}
-	config.Set(configApidInstanceID, apidInfo.InstanceID)
-
-	return nil
-}
-
-func createManagers() {
-	if isOfflineMode {
-		apidSnapshotManager = &offlineSnapshotManager{}
-		apidChangeManager = &offlineChangeManager{}
-	} else {
-		apidSnapshotManager = createSnapShotManager()
-		apidChangeManager = createChangeManager()
-	}
-
-	apidTokenManager = createSimpleTokenManager()
-}
-
 func checkForRequiredValues() error {
 	required := []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret}
 	if !isOfflineMode {
@@ -169,7 +113,7 @@
 }
 
 /* initialization */
-func _initPlugin(services apid.Services) error {
+func initConfigs(services apid.Services) error {
 	log.Debug("start init")
 
 	config = services.Config()
@@ -185,73 +129,87 @@
 		return err
 	}
 
-	err = initVariables()
+	return nil
+}
+
+func initManagers() error {
+	// check for forward proxy
+	var tr *http.Transport
+	tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
+	tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
+
+	apidDbManager := creatDbManager()
+	db, err := dataService.DB()
 	if err != nil {
-		return err
+		return fmt.Errorf("Unable to access DB: %v", err)
+	}
+	apidDbManager.setDB(db)
+	err = apidDbManager.initDB()
+	if err != nil {
+		return fmt.Errorf("Unable to access DB: %v", err)
 	}
 
+	apidInfo, err = apidDbManager.getApidInstanceInfo()
+	if err != nil {
+		return fmt.Errorf("Unable to get apid instance info: %v", err)
+	}
+
+	if config.IsSet(configApidInstanceID) {
+		log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
+	}
+	config.Set(configApidInstanceID, apidInfo.InstanceID)
+
+	apidTokenManager := createSimpleTokenManager(apidInfo.IsNewInstance)
+	var apidSnapshotManager snapShotManager
+	var apidChangeManager changeManager
+
+	if isOfflineMode {
+		apidSnapshotManager = &offlineSnapshotManager{
+			dbMan: apidDbManager,
+		}
+		apidChangeManager = &offlineChangeManager{}
+	} else {
+		httpClient := &http.Client{
+			Transport: tr,
+			Timeout:   httpTimeout,
+			CheckRedirect: func(req *http.Request, _ []*http.Request) error {
+				req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
+				return nil
+			},
+		}
+		apidSnapshotManager = createSnapShotManager(apidDbManager, apidTokenManager, httpClient)
+		apidChangeManager = createChangeManager(apidDbManager, apidSnapshotManager, apidTokenManager, httpClient)
+	}
+
+	listenerMan := &listenerManager{
+		changeMan: apidChangeManager,
+		snapMan:   apidSnapshotManager,
+		tokenMan:  apidTokenManager,
+	}
+
+	apiMan := &ApiManager{
+		tokenMan: apidTokenManager,
+	}
+
+	listenerMan.init()
+	apiMan.InitAPI(apiService)
 	return nil
 }
 
 func initPlugin(services apid.Services) (apid.PluginData, error) {
 	SetLogger(services.Log().ForModule("apigeeSync"))
 	dataService = services.Data()
-	events = services.Events()
-
-	err := _initPlugin(services)
+	eventService = services.Events()
+	apiService = services.API()
+	err := initConfigs(services)
 	if err != nil {
 		return pluginData, err
 	}
 
-	createManagers()
-
-	/* This callback function will get called once all the plugins are
-	 * initialized (not just this plugin). This is needed because,
-	 * downloadSnapshots/changes etc have to begin to be processed only
-	 * after all the plugins are initialized
-	 */
-	events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
-
-	InitAPI(services)
-	log.Debug("end init")
-
-	return pluginData, nil
-}
-
-// Plugins have all initialized, gather their info and start the ApigeeSync downloads
-func postInitPlugins(event apid.Event) {
-	var plinfoDetails []pluginDetail
-	if pie, ok := event.(apid.PluginsInitializedEvent); ok {
-		/*
-		 * Store the plugin details in the heap. Needed during
-		 * Bearer token generation request.
-		 */
-		for _, plugin := range pie.Plugins {
-			name := plugin.Name
-			version := plugin.Version
-			if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
-				inf := pluginDetail{
-					Name:          name,
-					SchemaVersion: schemaVersion}
-				plinfoDetails = append(plinfoDetails, inf)
-				log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
-			}
-		}
-		if plinfoDetails == nil {
-			log.Panicf("No Plugins registered!")
-		}
-
-		pgInfo, err := json.Marshal(plinfoDetails)
-		if err != nil {
-			log.Panicf("Unable to marshal plugin data: %v", err)
-		}
-		apidPluginDetails = string(pgInfo[:])
-
-		log.Debug("start post plugin init")
-
-		apidTokenManager.start()
-		go bootstrap()
-
-		log.Debug("Done post plugin init")
+	if err = initManagers(); err != nil {
+		return pluginData, err
 	}
+
+	log.Debug("end init")
+	return pluginData, nil
 }
diff --git a/init_test.go b/init_test.go
index 03e5450..7727ee7 100644
--- a/init_test.go
+++ b/init_test.go
@@ -18,36 +18,47 @@
 	"github.com/apid/apid-core"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"net/http"
+	"strconv"
 )
 
 var _ = Describe("init", func() {
-	var _ = BeforeEach(func() {
-		_initPlugin(apid.AllServices())
+	testCount := 0
+	BeforeEach(func() {
+		testCount++
 	})
 
 	Context("Apid Instance display name", func() {
+		AfterEach(func() {
+			eventService = apid.Events()
+			apiService = apid.API()
+		})
 
 		It("should be hostname by default", func() {
-			log.Info("Starting init tests...")
-
-			initConfigDefaults()
-			Expect(apidInfo.InstanceName).To(Equal("testhost"))
+			me := &mockEvent{
+				listenerMap: make(map[apid.EventSelector]apid.EventHandlerFunc),
+			}
+			ma := &mockApi{
+				handleMap: make(map[string]http.HandlerFunc),
+			}
+			ms := &mockService{
+				config: apid.Config(),
+				log:    apid.Log(),
+				api:    ma,
+				data:   apid.Data(),
+				events: me,
+			}
+			testname := "test_" + strconv.Itoa(testCount)
+			config.Set(configName, testname)
+			pd, err := initPlugin(ms)
+			Expect(err).Should(Succeed())
+			Expect(apidInfo.InstanceName).To(Equal(testname))
+			Expect(me.listenerMap[apid.SystemEventsSelector]).ToNot(BeNil())
+			Expect(ma.handleMap[tokenEndpoint]).ToNot(BeNil())
+			Expect(pd).Should(Equal(pluginData))
+			Expect(apidInfo.IsNewInstance).Should(BeTrue())
+			dataService.ReleaseCommonDB()
 		}, 3)
 
-		It("accept display name from config", func() {
-			config.Set(configName, "aa01")
-			initConfigDefaults()
-			var apidInfoLatest apidInstanceInfo
-			apidInfoLatest, _ = getApidInstanceInfo()
-			Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
-			Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
-		}, 3)
-
-	})
-
-	It("should put apigeesync_apid_instance_id value in config", func() {
-		instanceID := config.GetString(configApidInstanceID)
-		Expect(instanceID).NotTo(BeEmpty())
-		Expect(instanceID).To(Equal(apidInfo.InstanceID))
 	})
 })
diff --git a/listener.go b/listener.go
index 94befd6..caafe8f 100644
--- a/listener.go
+++ b/listener.go
@@ -15,9 +15,8 @@
 package apidApigeeSync
 
 import (
-	"errors"
+	"encoding/json"
 	"github.com/apid/apid-core"
-	"github.com/apigee-labs/transicator/common"
 )
 
 const (
@@ -25,107 +24,85 @@
 	LISTENER_TABLE_DATA_SCOPE   = "edgex.data_scope"
 )
 
-func processSnapshot(snapshot *common.Snapshot) {
+type listenerManager struct {
+	changeMan changeManager
+	snapMan   snapShotManager
+	tokenMan  tokenManager
+}
 
-	var prevDb string
-	if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
-		log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
-			apidInfo.LastSnapshot, snapshot.SnapshotInfo)
-		prevDb = apidInfo.LastSnapshot
-	} else {
-		log.Debugf("Process snapshot for version {%s}",
-			snapshot.SnapshotInfo)
-	}
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Unable to access database: %v", err)
-	}
+func (l *listenerManager) init() {
+	/* This callback function will get called once all the plugins are
+	 * initialized (not just this plugin). This is needed because,
+	 * downloadSnapshots/changes etc have to begin to be processed only
+	 * after all the plugins are initialized
+	 */
+	eventService.ListenOnceFunc(apid.SystemEventsSelector, l.postInitPlugins)
+}
 
-	processSqliteSnapshot(db)
+// Plugins have all initialized, gather their info and start the ApigeeSync downloads
+func (l *listenerManager) postInitPlugins(event apid.Event) {
+	var plinfoDetails []pluginDetail
+	if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+		/*
+		 * Store the plugin details in the heap. Needed during
+		 * Bearer token generation request.
+		 */
+		for _, plugin := range pie.Plugins {
+			name := plugin.Name
+			version := plugin.Version
+			if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
+				inf := pluginDetail{
+					Name:          name,
+					SchemaVersion: schemaVersion}
+				plinfoDetails = append(plinfoDetails, inf)
+				log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
+			}
+		}
+		if plinfoDetails == nil {
+			log.Panic("No Plugins registered!")
+		}
 
-	//update apid instance info
-	apidInfo.LastSnapshot = snapshot.SnapshotInfo
-	err = updateApidInstanceInfo()
-	if err != nil {
-		log.Panicf("Unable to update instance info: %v", err)
-	}
+		pgInfo, err := json.Marshal(plinfoDetails)
+		if err != nil {
+			log.Panicf("Unable to marshal plugin data: %v", err)
+		}
+		apidPluginDetails = string(pgInfo[:])
 
-	setDB(db)
-	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+		log.Debug("start post plugin init")
 
-	// Releases the DB, when the Connection reference count reaches 0.
-	if prevDb != "" {
-		dataService.ReleaseDB(prevDb)
+		l.tokenMan.start()
+		go l.bootstrap(apidInfo.LastSnapshot)
+
+		log.Debug("Done post plugin init")
 	}
 }
 
-func processSqliteSnapshot(db apid.DB) {
-
-	var numApidClusters int
-	tx, err := db.Begin()
-	if err != nil {
-		log.Panicf("Unable to open DB txn: {%v}", err.Error())
-	}
-	defer tx.Rollback()
-	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
-	if err != nil {
-		log.Panicf("Unable to read database: {%s}", err.Error())
+/*
+ *  Start from existing snapshot if possible
+ *  If an existing snapshot does not exist, use the apid scope to fetch
+ *  all data scopes, then get a snapshot for those data scopes
+ *
+ *  Then, poll for changes
+ */
+func (l *listenerManager) bootstrap(lastSnap string) {
+	if isOfflineMode && lastSnap == "" {
+		log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
 	}
 
-	if numApidClusters != 1 {
-		log.Panic("Illegal state for apid_cluster. Must be a single row.")
-	}
-
-	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
-	if err != nil {
-		if err.Error() == "duplicate column name: last_sequence" {
+	if lastSnap != "" {
+		if err := l.snapMan.startOnDataSnapshot(lastSnap); err == nil {
+			log.Infof("Started on local snapshot: %s", lastSnap)
+			l.changeMan.pollChangeWithBackoff()
 			return
 		} else {
-			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
-		}
-	}
-	if err = tx.Commit(); err != nil {
-		log.Errorf("Error when commit in processSqliteSnapshot: %v", err)
-	}
-}
-
-func processChangeList(changes *common.ChangeList) bool {
-
-	ok := false
-
-	tx, err := getDB().Begin()
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-	}
-	defer tx.Rollback()
-
-	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
-
-	for _, change := range changes.Changes {
-		if change.Table == LISTENER_TABLE_APID_CLUSTER {
-			log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-		}
-		switch change.Operation {
-		case common.Insert:
-			ok = insert(change.Table, []common.Row{change.NewRow}, tx)
-		case common.Update:
-			if change.Table == LISTENER_TABLE_DATA_SCOPE {
-				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-			}
-			ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
-		case common.Delete:
-			ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
-		}
-		if !ok {
-			err = errors.New("Sql Operation error. Operation rollbacked")
-			log.Error("Sql Operation error. Operation rollbacked")
-			return false
+			log.Errorf("Failed to bootstrap on local snapshot: %v", err)
+			log.Warn("Will get new snapshots.")
 		}
 	}
 
-	if err = tx.Commit(); err != nil {
-		log.Errorf("Commit error in processChangeList: %v", err)
-		return false
+	l.snapMan.downloadBootSnapshot()
+	if err := l.snapMan.downloadDataSnapshot(); err != nil {
+		log.Panicf("Error downloading data snapshot: %v", err)
 	}
-	return true
+	l.changeMan.pollChangeWithBackoff()
 }
diff --git a/listener_test.go b/listener_test.go
index 54974eb..5e8f072 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,360 +15,79 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
-
-	"github.com/apid/apid-core"
-	"github.com/apigee-labs/transicator/common"
-	"os"
-	"reflect"
-	"sort"
+	"strconv"
 )
 
 var _ = Describe("listener", func() {
+	testCount := 0
+	var testListenerMan *listenerManager
+	var dummyChangeMan *dummyChangeManager
+	var dummySnapMan *dummySnapshotManager
+	var dummyTokenMan *dummyTokenManager
 
-	var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
-		initDb(sqlfile, "./mockdb.sqlite3")
-		file, err := os.Open("./mockdb.sqlite3")
-		Expect(err).ShouldNot(HaveOccurred())
-
-		s := common.Snapshot{}
-		err = processSnapshotServerFileResponse(dbId, file, &s)
-		Expect(err).ShouldNot(HaveOccurred())
-		return s
-	}
-
-	var _ = BeforeEach(func() {
-		_initPlugin(apid.AllServices())
-	})
-
-	var _ = AfterEach(func() {
-		if wipeDBAferTest {
-			db, err := dataService.DB()
-			Expect(err).Should(Succeed())
-			tx, err := db.Begin()
-			Expect(err).Should(Succeed())
-			_, err = tx.Exec("DELETE FROM APID")
-			Expect(err).Should(Succeed())
-			err = tx.Commit()
-			Expect(err).Should(Succeed())
+	BeforeEach(func() {
+		testCount++
+		dummySnapMan = &dummySnapshotManager{
+			downloadCalledChan: make(chan bool, 1),
+			startCalledChan:    make(chan bool, 1),
 		}
-		wipeDBAferTest = true
+		dummyTokenMan = &dummyTokenManager{
+			invalidateChan: make(chan bool, 1),
+		}
+		dummyChangeMan = &dummyChangeManager{
+			pollChangeWithBackoffChan: make(chan bool, 1),
+		}
+		testListenerMan = &listenerManager{
+			changeMan: dummyChangeMan,
+			snapMan:   dummySnapMan,
+			tokenMan:  dummyTokenMan,
+		}
 	})
 
-	Context("ApigeeSync snapshot event", func() {
+	AfterEach(func() {
 
-		It("should fail if more than one apid_cluster rows", func() {
-			event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
-			Expect(func() { processSnapshot(&event) }).To(Panic())
-		}, 3)
-
-		It("should fail if more than one apid_cluster rows", func() {
-			newScopes := []string{"foo"}
-			scopes := []string{"bar"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo", "bar"}
-			scopes = []string{"bar"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo"}
-			scopes = []string{"bar", "foo"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo", "bar"}
-			scopes = []string{"bar", "foo"}
-			Expect(scopeChanged(newScopes, scopes)).To(BeNil())
-
-		}, 3)
-
-		It("should process a valid Snapshot", func() {
-
-			event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
-			processSnapshot(&event)
-
-			info, err := getApidInstanceInfo()
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
-
-			db := getDB()
-
-			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(db == expectedDB).Should(BeTrue())
-
-			// apid Cluster
-			var dcs []dataApidCluster
-
-			rows, err := db.Query(`
-			SELECT id, name, description, umbrella_org_app_name,
-				created, created_by, updated, updated_by
-			FROM EDGEX_APID_CLUSTER`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			c := dataApidCluster{}
-			for rows.Next() {
-				rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
-					&c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
-				dcs = append(dcs, c)
-			}
-
-			Expect(len(dcs)).To(Equal(1))
-			dc := dcs[0]
-
-			Expect(dc.ID).To(Equal("i"))
-			Expect(dc.Name).To(Equal("n"))
-			Expect(dc.Description).To(Equal("d"))
-			Expect(dc.OrgAppName).To(Equal("o"))
-			Expect(dc.Created).To(Equal("c"))
-			Expect(dc.CreatedBy).To(Equal("c"))
-			Expect(dc.Updated).To(Equal("u"))
-			Expect(dc.UpdatedBy).To(Equal("u"))
-
-			// Data Scope
-			var dds []dataDataScope
-
-			rows, err = db.Query(`
-			SELECT id, apid_cluster_id, scope, org,
-				env, created, created_by, updated,
-				updated_by
-			FROM EDGEX_DATA_SCOPE`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			d := dataDataScope{}
-			for rows.Next() {
-				rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-					&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-					&d.UpdatedBy)
-				dds = append(dds, d)
-			}
-
-			Expect(len(dds)).To(Equal(3))
-			ds := dds[0]
-
-			Expect(ds.ID).To(Equal("i"))
-			Expect(ds.Org).To(Equal("o"))
-			Expect(ds.Env).To(Equal("e1"))
-			Expect(ds.Scope).To(Equal("s1"))
-			Expect(ds.Created).To(Equal("c"))
-			Expect(ds.CreatedBy).To(Equal("c"))
-			Expect(ds.Updated).To(Equal("u"))
-			Expect(ds.UpdatedBy).To(Equal("u"))
-
-			ds = dds[1]
-			Expect(ds.Env).To(Equal("e2"))
-			Expect(ds.Scope).To(Equal("s1"))
-			ds = dds[2]
-			Expect(ds.Env).To(Equal("e3"))
-			Expect(ds.Scope).To(Equal("s2"))
-
-			scopes := findScopesForId("a")
-			Expect(len(scopes)).To(Equal(6))
-			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
-			sort.Strings(scopes)
-			sort.Strings(expectedScopes)
-			Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
-		}, 3)
 	})
 
-	Context("ApigeeSync change event", func() {
-
-		Context(LISTENER_TABLE_APID_CLUSTER, func() {
-
-			It("insert event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
-				processSnapshot(&ssEvent)
-
-				//save the last snapshot, so we can restore it at the end of this context
-
-				csEvent := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_APID_CLUSTER,
-						},
+	It("postInitPlugins, start cleanly", func() {
+		testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+		eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+		eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+			Description: "test",
+			Plugins: []apid.PluginData{
+				{
+					Name:    "name",
+					Version: "0.0.1",
+					ExtraData: map[string]interface{}{
+						"schemaVersion": "0.0.1",
 					},
-				}
-
-				Expect(func() { processChangeList(&csEvent) }).To(Panic())
-			}, 3)
-
-			It("update event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Update,
-							Table:     LISTENER_TABLE_APID_CLUSTER,
-						},
-					},
-				}
-
-				Expect(func() { processChangeList(&event) }).To(Panic())
-				//restore the last snapshot
-			}, 3)
-
+				},
+			},
 		})
-
-		Context(LISTENER_TABLE_DATA_SCOPE, func() {
-
-			It("insert event should add", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "i"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s1"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "j"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s2"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-					},
-				}
-
-				processChangeList(&event)
-
-				var dds []dataDataScope
-
-				rows, err := getDB().Query(`
-				SELECT id, apid_cluster_id, scope, org,
-					env, created, created_by, updated,
-					updated_by
-				FROM EDGEX_DATA_SCOPE`)
-				Expect(err).NotTo(HaveOccurred())
-				defer rows.Close()
-
-				d := dataDataScope{}
-				for rows.Next() {
-					rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-						&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-						&d.UpdatedBy)
-					dds = append(dds, d)
-				}
-
-				//three already existing
-				Expect(len(dds)).To(Equal(2))
-				ds := dds[0]
-
-				Expect(ds.ID).To(Equal("i"))
-				Expect(ds.Org).To(Equal("o"))
-				Expect(ds.Env).To(Equal("e"))
-				Expect(ds.Scope).To(Equal("s1"))
-				Expect(ds.Created).To(Equal("c"))
-				Expect(ds.CreatedBy).To(Equal("c"))
-				Expect(ds.Updated).To(Equal("u"))
-				Expect(ds.UpdatedBy).To(Equal("u"))
-
-				ds = dds[1]
-				Expect(ds.Scope).To(Equal("s2"))
-
-				scopes := findScopesForId("a")
-				Expect(len(scopes)).To(Equal(2))
-				Expect(scopes[0]).To(Equal("s1"))
-				Expect(scopes[1]).To(Equal("s2"))
-
-			}, 3)
-
-			It("delete event should delete", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
-				processSnapshot(&ssEvent)
-				insert := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "i"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-					},
-				}
-
-				processChangeList(&insert)
-
-				delete := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Delete,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							OldRow:    insert.Changes[0].NewRow,
-						},
-					},
-				}
-
-				processChangeList(&delete)
-
-				var nRows int
-				err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
-				Expect(err).NotTo(HaveOccurred())
-
-				Expect(0).To(Equal(nRows))
-			}, 3)
-
-			It("update event should panic for data scopes table", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Update,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-						},
-					},
-				}
-
-				Expect(func() { processChangeList(&event) }).To(Panic())
-				//restore the last snapshot
-			}, 3)
-
-			//TODO add tests for update/insert/delete cluster
-		})
+		Expect(<-dummySnapMan.downloadCalledChan).Should(BeFalse())
+		Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
 	})
+
+	It("postInitPlugins, start from local db", func() {
+		apidInfo.LastSnapshot = "test_snapshot"
+		testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+		eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+		eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+			Description: "test",
+			Plugins: []apid.PluginData{
+				{
+					Name:    "name",
+					Version: "0.0.1",
+					ExtraData: map[string]interface{}{
+						"schemaVersion": "0.0.1",
+					},
+				},
+			},
+		})
+		Expect(<-dummySnapMan.startCalledChan).Should(BeTrue())
+		Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
+	})
+
 })
diff --git a/managerInterfaces.go b/managerInterfaces.go
index a93e950..facc616 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -15,16 +15,14 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	"github.com/apigee-labs/transicator/common"
-	"net/url"
 )
 
 type tokenManager interface {
 	getBearerToken() string
 	invalidateToken() error
-	getToken() *OauthToken
 	close()
-	getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
 	start()
 	getTokenReadyChannel() <-chan bool
 }
@@ -32,14 +30,23 @@
 type snapShotManager interface {
 	close() <-chan bool
 	downloadBootSnapshot()
-	storeBootSnapshot(snapshot *common.Snapshot)
-	downloadDataSnapshot()
-	storeDataSnapshot(snapshot *common.Snapshot)
-	downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error
-	startOnLocalSnapshot(snapshot string) *common.Snapshot
+	downloadDataSnapshot() error
+	startOnDataSnapshot(snapshot string) error
 }
 
 type changeManager interface {
 	close() <-chan bool
 	pollChangeWithBackoff()
 }
+
+type DbManager interface {
+	initDB() error
+	setDB(db apid.DB)
+	getLastSequence() (lastSequence string)
+	findScopesForId(configId string) (scopes []string, err error)
+	updateLastSequence(lastSequence string) error
+	getApidInstanceInfo() (info apidInstanceInfo, err error)
+	processChangeList(changes *common.ChangeList) error
+	processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error
+	getKnowTables() map[string]bool
+}
diff --git a/mock_server.go b/mock_server.go
index f39adb7..e201097 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -90,7 +90,7 @@
 	changeChannel   chan []byte
 	sequenceID      *int64
 	maxDevID        *int64
-	deployIDMutex   sync.RWMutex
+	deployIDMutex   *sync.RWMutex
 	minDeploymentID *int64
 	maxDeploymentID *int64
 	newSnap         *int32
@@ -118,11 +118,13 @@
 }
 
 func (m *MockServer) lastSequenceID() string {
-	return strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+	num := strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+	return num + "." + num + "." + num
 }
 
 func (m *MockServer) nextSequenceID() string {
-	return strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+	num := strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+	return num + "." + num + "." + num
 }
 
 func (m *MockServer) nextDeveloperID() string {
@@ -180,7 +182,7 @@
 	m.newSnap = new(int32)
 	m.authFail = new(int32)
 	*m.authFail = 0
-
+	m.deployIDMutex = &sync.RWMutex{}
 	initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
 	initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
 
@@ -285,7 +287,7 @@
 func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
 	defer GinkgoRecover()
 
-	val := atomic.SwapInt32(m.newSnap, 0)
+	val := atomic.LoadInt32(m.newSnap)
 	if val > 0 {
 		log.Debug("MockServer: force new snapshot")
 		w.WriteHeader(http.StatusBadRequest)
diff --git a/snapshot.go b/snapshot.go
index df4d63f..e3787bf 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -15,12 +15,12 @@
 package apidApigeeSync
 
 import (
-	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	"net/http"
 	"os"
 
+	"fmt"
 	"io"
 	"io/ioutil"
 	"net/url"
@@ -38,9 +38,12 @@
 	isClosed *int32
 	// make sure close() returns immediately if there's no downloading/processing snapshot
 	isDownloading *int32
+	tokenMan      tokenManager
+	dbMan         DbManager
+	client        *http.Client
 }
 
-func createSnapShotManager() *simpleSnapShotManager {
+func createSnapShotManager(dbMan DbManager, tokenMan tokenManager, client *http.Client) *simpleSnapShotManager {
 	isClosedInt := int32(0)
 	isDownloadingInt := int32(0)
 	return &simpleSnapShotManager{
@@ -48,6 +51,9 @@
 		finishChan:    make(chan bool, 1),
 		isClosed:      &isClosedInt,
 		isDownloading: &isDownloadingInt,
+		dbMan:         dbMan,
+		tokenMan:      tokenMan,
+		client:        client,
 	}
 }
 
@@ -114,11 +120,13 @@
 }
 
 func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
-	processSnapshot(snapshot)
+	if err := s.dbMan.processSnapshot(snapshot, false); err != nil {
+		log.Panic(err)
+	}
 }
 
 // use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *simpleSnapShotManager) downloadDataSnapshot() {
+func (s *simpleSnapShotManager) downloadDataSnapshot() error {
 	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
 		log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
 	}
@@ -127,102 +135,45 @@
 	// has been closed
 	if atomic.LoadInt32(s.isClosed) == int32(1) {
 		log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager")
-		return
+		return nil
 	}
 
 	log.Debug("download Snapshot for data scopes")
 
-	scopes := findScopesForId(apidInfo.ClusterID)
+	scopes, err := s.dbMan.findScopesForId(apidInfo.ClusterID)
+	if err != nil {
+		return err
+	}
 	scopes = append(scopes, apidInfo.ClusterID)
 	snapshot := &common.Snapshot{}
-	err := s.downloadSnapshot(false, scopes, snapshot)
+	err = s.downloadSnapshot(false, scopes, snapshot)
 	if err != nil {
 		// this may happen during shutdown
 		if _, ok := err.(quitSignalError); ok {
 			log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error())
 		}
-		return
+		return err
 	}
-	s.storeDataSnapshot(snapshot)
-}
-
-func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-	knownTables = extractTablesFromSnapshot(snapshot)
-
-	_, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-
-	processSnapshot(snapshot)
-	log.Info("Emitting Snapshot to plugins")
-
-	select {
-	case <-time.After(pluginTimeout):
-		log.Panic("Timeout. Plugins failed to respond to snapshot.")
-	case <-events.Emit(ApigeeSyncEventSelector, snapshot):
-		// the new snapshot has been processed
-		// if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
-	}
-
-}
-
-func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from snapshot")
-	//if this panic ever fires, it's a bug
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-	return extractTablesFromDB(db)
-}
-
-func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from existing DB")
-	rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
-	defer rows.Close()
-
-	if err != nil {
-		log.Panicf("Error reading current set of tables: %v", err)
-	}
-
-	for rows.Next() {
-		var table string
-		if err := rows.Scan(&table); err != nil {
-			log.Panicf("Error reading current set of tables: %v", err)
-		}
-		log.Debugf("Table %s found in existing db", table)
-
-		tables[table] = true
-	}
-	return tables
+	return s.startOnDataSnapshot(snapshot.SnapshotInfo)
 }
 
 // Skip Downloading snapshot if there is already a snapshot available from previous run
-func (s *simpleSnapShotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
-	log.Infof("Starting on local snapshot: %s", snapshotName)
-
-	// ensure DB version will be accessible on behalf of dependant plugins
-	db, err := dataService.DBVersion(snapshotName)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-
-	knownTables = extractTablesFromDB(db)
+func (s *simpleSnapShotManager) startOnDataSnapshot(snapshotName string) error {
+	log.Infof("Processing snapshot: %s", snapshotName)
 	snapshot := &common.Snapshot{
 		SnapshotInfo: snapshotName,
 	}
-	processSnapshot(snapshot)
-
-	// allow plugins (including this one) to start immediately on existing database
-	// Note: this MUST have no tables as that is used as an indicator
-	return snapshot
+	if err := s.dbMan.processSnapshot(snapshot, true); err != nil {
+		return err
+	}
+	log.Info("Emitting Snapshot to plugins")
+	select {
+	case <-time.After(pluginTimeout):
+		return fmt.Errorf("timeout, plugins failed to respond to snapshot")
+	case <-eventService.Emit(ApigeeSyncEventSelector, snapshot):
+		// the new snapshot has been processed
+	}
+	return nil
 }
 
 // a blocking method
@@ -254,12 +205,12 @@
 
 	//pollWithBackoff only accepts function that accept a single quit channel
 	//to accommodate functions which need more parameters, wrap them in closures
-	attemptDownload := getAttemptDownloadClosure(isBoot, snapshot, uri)
+	attemptDownload := s.getAttemptDownloadClosure(isBoot, snapshot, uri)
 	pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
 	return nil
 }
 
-func getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
+func (s *simpleSnapShotManager) getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
 	return func(_ chan bool) error {
 
 		var tid string
@@ -268,7 +219,7 @@
 			// should never happen, but if it does, it's unrecoverable anyway
 			log.Panicf("Snapshotserver comm error: %v", err)
 		}
-		addHeaders(req)
+		addHeaders(req, s.tokenMan.getBearerToken())
 
 		var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error
 
@@ -280,7 +231,7 @@
 		processSnapshotResponse = processSnapshotServerFileResponse
 
 		// Issue the request to the snapshot server
-		r, err := httpclient.Do(req)
+		r, err := s.client.Do(req)
 		if err != nil {
 			log.Errorf("Snapshotserver comm error: %v", err)
 			return err
@@ -315,11 +266,20 @@
 
 func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
 	dbPath := data.DBPath("common/" + dbId)
+	dbDir := dbPath[0 : len(dbPath)-7]
 	log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
 
+	// if exists, delete the old snapshot file
+	if _, err := os.Stat(dbDir); !os.IsNotExist(err) {
+		if err = os.RemoveAll(dbDir); err != nil {
+			log.Errorf("Failed to delete old snapshot; %v", err)
+			return err
+		}
+	}
+
 	//this path includes the sqlite3 file name.  why does mkdir all stop at parent??
 	log.Infof("Creating directory with mkdirall %s", dbPath)
-	err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700)
+	err := os.MkdirAll(dbDir, 0700)
 	if err != nil {
 		log.Errorf("Error creating db path %s", err)
 	}
@@ -347,6 +307,7 @@
 }
 
 type offlineSnapshotManager struct {
+	dbMan DbManager
 }
 
 func (o *offlineSnapshotManager) close() <-chan bool {
@@ -355,33 +316,28 @@
 	return c
 }
 
-func (o *offlineSnapshotManager) downloadBootSnapshot() {}
-
-func (o *offlineSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadDataSnapshot() {}
-
-func (o *offlineSnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
-	return nil
+func (o *offlineSnapshotManager) downloadBootSnapshot() {
+	log.Panic("downloadBootSnapshot called for offlineSnapshotManager")
 }
-func (o *offlineSnapshotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
-	log.Infof("Starting on local snapshot: %s", snapshotName)
 
-	// ensure DB version will be accessible on behalf of dependant plugins
-	db, err := dataService.DBVersion(snapshotName)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
+func (o *offlineSnapshotManager) downloadDataSnapshot() error {
+	return fmt.Errorf("downloadDataSnapshot called for offlineSnapshotManager")
+}
 
-	knownTables = extractTablesFromDB(db)
+func (o *offlineSnapshotManager) startOnDataSnapshot(snapshotName string) error {
+	log.Infof("Processing snapshot: %s", snapshotName)
 	snapshot := &common.Snapshot{
 		SnapshotInfo: snapshotName,
 	}
-	processSnapshot(snapshot)
-
-	// allow plugins (including this one) to start immediately on existing database
-	// Note: this MUST have no tables as that is used as an indicator
-	return snapshot
+	if err := o.dbMan.processSnapshot(snapshot, true); err != nil {
+		return err
+	}
+	log.Info("Emitting Snapshot to plugins")
+	select {
+	case <-time.After(pluginTimeout):
+		return fmt.Errorf("timeout, plugins failed to respond to snapshot")
+	case <-eventService.Emit(ApigeeSyncEventSelector, snapshot):
+		// the new snapshot has been processed
+	}
+	return nil
 }
diff --git a/test_mock_test.go b/test_mock_test.go
index de2f673..dbb4a43 100644
--- a/test_mock_test.go
+++ b/test_mock_test.go
@@ -14,10 +14,115 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	"github.com/apigee-labs/transicator/common"
-	"net/url"
+	"math/rand"
+	"net/http"
+	"strconv"
 )
 
+type mockService struct {
+	config apid.ConfigService
+	log    apid.LogService
+	api    apid.APIService
+	data   apid.DataService
+	events apid.EventsService
+}
+
+func (s *mockService) API() apid.APIService {
+	return s.api
+}
+
+func (s *mockService) Config() apid.ConfigService {
+	return s.config
+}
+
+func (s *mockService) Data() apid.DataService {
+	return s.data
+}
+
+func (s *mockService) Events() apid.EventsService {
+	return s.events
+}
+
+func (s *mockService) Log() apid.LogService {
+	return s.log
+}
+
+type mockApi struct {
+	handleMap map[string]http.HandlerFunc
+}
+
+func (m *mockApi) Listen() error {
+	return nil
+}
+func (m *mockApi) Handle(path string, handler http.Handler) apid.Route {
+	return nil
+}
+func (m *mockApi) HandleFunc(path string, handlerFunc http.HandlerFunc) apid.Route {
+	m.handleMap[path] = handlerFunc
+	return apid.API().HandleFunc(path+strconv.Itoa(rand.Int()), handlerFunc)
+}
+func (m *mockApi) Vars(r *http.Request) map[string]string {
+	return nil
+}
+
+func (m *mockApi) Router() apid.Router {
+	return nil
+}
+
+type mockData struct {
+}
+
+func (m *mockData) DB() (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) DBForID(id string) (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) DBVersion(version string) (apid.DB, error) {
+	return nil, nil
+}
+func (m *mockData) DBVersionForID(id, version string) (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) ReleaseDB(version string)          {}
+func (m *mockData) ReleaseCommonDB()                  {}
+func (m *mockData) ReleaseDBForID(id, version string) {}
+
+type mockEvent struct {
+	listenerMap map[apid.EventSelector]apid.EventHandlerFunc
+}
+
+func (e *mockEvent) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+	return nil
+}
+
+func (e *mockEvent) EmitWithCallback(selector apid.EventSelector, event apid.Event, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) ListenFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) ListenOnceFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+	e.listenerMap[selector] = handler
+}
+
+func (e *mockEvent) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) Close() {}
+
 type dummyChangeManager struct {
 	pollChangeWithBackoffChan chan bool
 }
@@ -46,31 +151,21 @@
 
 func (t *dummyTokenManager) invalidateToken() error {
 	log.Debug("invalidateToken called")
-	testMock.passAuthCheck()
 	t.invalidateChan <- true
 	return nil
 }
 
-func (t *dummyTokenManager) getToken() *OauthToken {
-	return nil
-}
-
 func (t *dummyTokenManager) close() {
 	return
 }
 
-func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
-	return func(chan bool) error {
-		return nil
-	}
-}
-
 func (t *dummyTokenManager) start() {
 
 }
 
 type dummySnapshotManager struct {
 	downloadCalledChan chan bool
+	startCalledChan    chan bool
 }
 
 func (s *dummySnapshotManager) close() <-chan bool {
@@ -80,28 +175,54 @@
 }
 
 func (s *dummySnapshotManager) downloadBootSnapshot() {
-
+	s.downloadCalledChan <- false
 }
 
-func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadDataSnapshot() {
-	log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+func (s *dummySnapshotManager) downloadDataSnapshot() error {
 	s.downloadCalledChan <- true
-}
-
-func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
 	return nil
 }
 
-func (s *dummySnapshotManager) startOnLocalSnapshot(snapshot string) *common.Snapshot {
-	return &common.Snapshot{
-		SnapshotInfo: snapshot,
-	}
+func (s *dummySnapshotManager) startOnDataSnapshot(snapshot string) error {
+	s.startCalledChan <- true
+	return nil
+}
+
+type dummyDbManager struct {
+	lastSequence string
+	knownTables  map[string]bool
+	scopes       []string
+}
+
+func (d *dummyDbManager) initDB() error {
+	return nil
+}
+func (d *dummyDbManager) setDB(db apid.DB) {
+
+}
+func (d *dummyDbManager) getLastSequence() (lastSequence string) {
+	return d.lastSequence
+}
+func (d *dummyDbManager) findScopesForId(configId string) (scopes []string, err error) {
+	return d.scopes, nil
+}
+func (d *dummyDbManager) updateLastSequence(lastSequence string) error {
+	return nil
+}
+func (d *dummyDbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
+	return apidInstanceInfo{
+		InstanceID:   "",
+		InstanceName: "",
+		ClusterID:    "",
+		LastSnapshot: "",
+	}, nil
+}
+func (d *dummyDbManager) processChangeList(changes *common.ChangeList) error {
+	return nil
+}
+func (d *dummyDbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+	return nil
+}
+func (d *dummyDbManager) getKnowTables() map[string]bool {
+	return d.knownTables
 }
diff --git a/token.go b/token.go
index 95d4b06..8d49a8e 100644
--- a/token.go
+++ b/token.go
@@ -40,7 +40,7 @@
    man.close()
 */
 
-func createSimpleTokenManager() *simpleTokenManager {
+func createSimpleTokenManager(isNewInstance bool) *simpleTokenManager {
 	isClosedInt := int32(0)
 
 	t := &simpleTokenManager{
@@ -52,6 +52,7 @@
 		invalidateDone:      make(chan bool),
 		tokenUpdatedChan:    make(chan bool, 1),
 		isClosed:            &isClosedInt,
+		isNewInstance:       isNewInstance,
 	}
 	return t
 }
@@ -67,6 +68,7 @@
 	returnTokenChan     chan *OauthToken
 	invalidateDone      chan bool
 	tokenUpdatedChan    chan bool
+	isNewInstance       bool
 }
 
 func (t *simpleTokenManager) start() {
@@ -168,9 +170,9 @@
 		req.Header.Set("status", "ONLINE")
 		req.Header.Set("plugin_details", apidPluginDetails)
 
-		if newInstanceID {
+		if t.isNewInstance {
 			req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339))
-			newInstanceID = false
+			t.isNewInstance = false
 		} else {
 			req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
 		}
diff --git a/token_test.go b/token_test.go
index 1dfaabb..1883a6f 100644
--- a/token_test.go
+++ b/token_test.go
@@ -94,7 +94,7 @@
 				w.Write(body)
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createSimpleTokenManager(false)
 			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 
@@ -123,7 +123,7 @@
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
 
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createSimpleTokenManager(false)
 			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 			Expect(token.AccessToken).ToNot(BeEmpty())
@@ -163,7 +163,7 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createSimpleTokenManager(false)
 			testedTokenManager.start()
 			testedTokenManager.getToken()
 
@@ -179,8 +179,6 @@
 			finished := make(chan bool, 1)
 			count := 0
 
-			newInstanceID = true
-
 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 
 				count++
@@ -202,7 +200,7 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createSimpleTokenManager(true)
 			testedTokenManager.start()
 			testedTokenManager.getToken()
 			testedTokenManager.invalidateToken()
