Issue 69568832 refactor code, fix bugs, add tests (#76)

* [ISSUE-69568832] refactor

* [ISSUE-69568832] refactor code, fix bugs, add tests

* [ISSUE-69568832] fix docker tests

* [ISSUE-69568832] format code

* [ISSUE-69568832] address comments
diff --git a/api.go b/api.go
index 73399d7..1fceebb 100644
--- a/api.go
+++ b/api.go
@@ -24,32 +24,45 @@
 
 const tokenEndpoint = "/accesstoken"
 
-func InitAPI(services apid.Services) {
-	services.API().HandleFunc(tokenEndpoint, getAccessToken).Methods("GET")
+const (
+	// long-polling timeout from http header
+	parBlock = "block"
+	// long-polling tag used for comparision
+	// if tag fails to match, new token is returned immediately
+	parTag = "If-None-Match"
+)
+
+type ApiManager struct {
+	tokenMan tokenManager
+	endpoint string
 }
 
-func getAccessToken(w http.ResponseWriter, r *http.Request) {
-	b := r.URL.Query().Get("block")
+func (a *ApiManager) InitAPI(api apid.APIService) {
+	api.HandleFunc(a.endpoint, a.getAccessToken).Methods("GET")
+}
+
+func (a *ApiManager) getAccessToken(w http.ResponseWriter, r *http.Request) {
+	b := r.URL.Query().Get(parBlock)
 	var timeout int
 	if b != "" {
 		var err error
 		timeout, err = strconv.Atoi(b)
-		if err != nil {
+		if err != nil || timeout < 0 {
 			writeError(w, http.StatusBadRequest, "bad block value, must be number of seconds")
 			return
 		}
 	}
 	log.Debugf("api timeout: %d", timeout)
-	ifNoneMatch := r.Header.Get("If-None-Match")
-
-	if apidTokenManager.getBearerToken() != ifNoneMatch {
-		w.Write([]byte(apidTokenManager.getBearerToken()))
+	ifNoneMatch := r.Header.Get(parTag)
+	log.Debugf("ifNoneMatch: %s", ifNoneMatch)
+	if a.tokenMan.getBearerToken() != ifNoneMatch {
+		w.Write([]byte(a.tokenMan.getBearerToken()))
 		return
 	}
 
 	select {
-	case <-apidTokenManager.getTokenReadyChannel():
-		w.Write([]byte(apidTokenManager.getBearerToken()))
+	case <-a.tokenMan.getTokenReadyChannel():
+		w.Write([]byte(a.tokenMan.getBearerToken()))
 	case <-time.After(time.Duration(timeout) * time.Second):
 		w.WriteHeader(http.StatusNotModified)
 	}
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..d46ac4c
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,125 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+	"fmt"
+	"github.com/apid/apid-core"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"strconv"
+	"time"
+)
+
+const (
+	apiTestUrl = "http://127.0.0.1:9000"
+)
+
+var _ = Describe("API Manager", func() {
+	testCount := 0
+	var testApiMan *ApiManager
+	var dummyTokenMan *dummyTokenManager
+	var client *http.Client
+	BeforeEach(func() {
+		testCount++
+		dummyTokenMan = &dummyTokenManager{
+			token:          fmt.Sprintf("test_token_%d", testCount),
+			tokenReadyChan: make(chan bool, 1),
+		}
+		testApiMan = &ApiManager{
+			endpoint: tokenEndpoint + strconv.Itoa(testCount),
+			tokenMan: dummyTokenMan,
+		}
+		testApiMan.InitAPI(apid.API())
+		time.Sleep(100 * time.Millisecond)
+		client = &http.Client{}
+	})
+
+	clientGet := func(path string, pars map[string][]string, header map[string][]string) (int, []byte) {
+		uri, err := url.Parse(apiTestUrl + path)
+		Expect(err).Should(Succeed())
+		query := url.Values(pars)
+		uri.RawQuery = query.Encode()
+		httpReq, err := http.NewRequest("GET", uri.String(), nil)
+		httpReq.Header = http.Header(header)
+		Expect(err).Should(Succeed())
+		res, err := client.Do(httpReq)
+		Expect(err).Should(Succeed())
+		defer res.Body.Close()
+		responseBody, err := ioutil.ReadAll(res.Body)
+		Expect(err).Should(Succeed())
+		return res.StatusCode, responseBody
+	}
+
+	It("should get token without long-polling", func() {
+		code, res := clientGet(testApiMan.endpoint, nil, nil)
+		Expect(code).Should(Equal(http.StatusOK))
+		Expect(string(res)).Should(Equal(dummyTokenMan.token))
+	})
+
+	It("should get bad request for invalid timeout", func() {
+		code, _ := clientGet(testApiMan.endpoint, map[string][]string{
+			parBlock: {"invalid"},
+		}, map[string][]string{
+			parTag: {dummyTokenMan.getBearerToken()},
+		})
+		Expect(code).Should(Equal(http.StatusBadRequest))
+
+		code, _ = clientGet(testApiMan.endpoint, map[string][]string{
+			parBlock: {"-1"},
+		}, map[string][]string{
+			parTag: {dummyTokenMan.getBearerToken()},
+		})
+		Expect(code).Should(Equal(http.StatusBadRequest))
+	})
+
+	It("should get token immediately if mismatch", func() {
+		code, res := clientGet(testApiMan.endpoint, map[string][]string{
+			parBlock: {"10"},
+		}, map[string][]string{
+			parTag: {"mismatch"},
+		})
+		Expect(code).Should(Equal(http.StatusOK))
+		Expect(string(res)).Should(Equal(dummyTokenMan.token))
+	}, 3)
+
+	It("should get StatusNotModified if timeout", func() {
+		code, _ := clientGet(testApiMan.endpoint, map[string][]string{
+			parBlock: {"1"},
+		}, map[string][]string{
+			parTag: {dummyTokenMan.getBearerToken()},
+		})
+		Expect(code).Should(Equal(http.StatusNotModified))
+	}, 3)
+
+	It("should do long-polling", func() {
+		go func() {
+			time.Sleep(1)
+			dummyTokenMan.token = "new_token"
+			dummyTokenMan.tokenReadyChan <- true
+		}()
+		code, res := clientGet(testApiMan.endpoint, map[string][]string{
+			parBlock: {"10"},
+		}, map[string][]string{
+			parTag: {dummyTokenMan.getBearerToken()},
+		})
+		Expect(code).Should(Equal(http.StatusOK))
+		Expect(string(res)).Should(Equal(dummyTokenMan.getBearerToken()))
+	}, 3)
+
+})
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 5055f68..47c5498 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -19,72 +19,70 @@
 	. "github.com/onsi/gomega"
 
 	"io/ioutil"
-	"net/http/httptest"
 	"os"
 	"testing"
 	"time"
 
 	"github.com/apid/apid-core"
-
+	"github.com/apid/apid-core/events"
 	"github.com/apid/apid-core/factory"
 )
 
-var (
-	tmpDir         string
-	testServer     *httptest.Server
-	testRouter     apid.Router
-	testMock       *MockServer
-	wipeDBAferTest bool
-)
-
 const dummyConfigValue string = "placeholder"
 const expectedClusterId = "bootstrap"
 
+var tmpDir string
+
 var _ = BeforeSuite(func() {
-	wipeDBAferTest = true
-})
-
-var _ = BeforeEach(func() {
 	apid.Initialize(factory.DefaultServicesFactory())
-
-	config = apid.Config()
 	dataService = apid.Data()
-	events = apid.Events()
-
+	config = apid.Config()
+	apiService = apid.API()
+	go apiService.Listen()
+	//dataService = apid.Data()
+	log = apid.Log().ForModule("apigeeSync")
 	var err error
-	tmpDir, err = ioutil.TempDir("", "api_test")
+	tmpDir, err = ioutil.TempDir("", "apid_test")
 	Expect(err).NotTo(HaveOccurred())
-	config.Set("local_storage_path", tmpDir)
-
+	config.Set(configLocalStoragePath, tmpDir)
 	config.Set(configProxyServerBaseURI, dummyConfigValue)
 	config.Set(configSnapServerBaseURI, dummyConfigValue)
 	config.Set(configChangeServerBaseURI, dummyConfigValue)
 	config.Set(configSnapshotProtocol, "sqlite")
 	config.Set(configPollInterval, 10*time.Millisecond)
 	config.Set(configDiagnosticMode, false)
-	config.Set(configName, "testhost")
-	config.Set(configApidClusterId, expectedClusterId)
 	config.Set(configConsumerKey, "XXXXXXX")
 	config.Set(configConsumerSecret, "YYYYYYY")
-
-	block = "0"
-	log = apid.Log().ForModule("apigeeSync")
+	config.Set(configApidInstanceID, "YYYYYYY")
 }, 3)
 
+var _ = BeforeEach(func() {
+	eventService = events.CreateService()
+	config.Set(configName, "testhost")
+	config.Set(configApidClusterId, expectedClusterId)
+	apidInfo.ClusterID = expectedClusterId
+	apidInfo.InstanceID = "YYYYYYY"
+	apidInfo.LastSnapshot = ""
+	apidInfo.IsNewInstance = true
+})
+
 var _ = AfterEach(func() {
-	apid.Events().Close()
-	lastSequence = ""
+	cleanCommonDb()
+	eventService.Close()
 })
 
 var _ = AfterSuite(func() {
-	apid.Events().Close()
-	if testServer != nil {
-		testServer.Close()
-	}
-	os.RemoveAll(tmpDir)
+	Expect(os.RemoveAll(tmpDir)).Should(Succeed())
 })
 
 func TestApigeeSync(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "ApigeeSync Suite")
 }
+
+func cleanCommonDb() {
+	db, err := dataService.DB()
+	Expect(err).Should(Succeed())
+	_, err = db.Exec(`DROP TABLE IF EXISTS APID;`)
+	Expect(err).Should(Succeed())
+}
diff --git a/apigee_sync.go b/apigee_sync.go
deleted file mode 100644
index 8a23079..0000000
--- a/apigee_sync.go
+++ /dev/null
@@ -1,140 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
-	"github.com/apid/apid-core"
-	"net/http"
-	"time"
-)
-
-const (
-	httpTimeout         = time.Minute
-	pluginTimeout       = time.Minute
-	maxIdleConnsPerHost = 10
-)
-
-var knownTables = make(map[string]bool)
-
-/*
- *  Start from existing snapshot if possible
- *  If an existing snapshot does not exist, use the apid scope to fetch
- *  all data scopes, then get a snapshot for those data scopes
- *
- *  Then, poll for changes
- */
-func bootstrap() {
-	if isOfflineMode && apidInfo.LastSnapshot == "" {
-		log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
-	}
-
-	if apidInfo.LastSnapshot != "" {
-		snapshot := apidSnapshotManager.startOnLocalSnapshot(apidInfo.LastSnapshot)
-		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
-			apidChangeManager.pollChangeWithBackoff()
-		})
-
-		log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
-		return
-	}
-
-	apidSnapshotManager.downloadBootSnapshot()
-	apidSnapshotManager.downloadDataSnapshot()
-
-	apidChangeManager.pollChangeWithBackoff()
-
-}
-
-/*
- * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
- * for retrying on errors
- */
-func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
-
-	backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
-
-	//inintialize the retry channel to start first attempt immediately
-	retry := time.After(0 * time.Millisecond)
-
-	for {
-		select {
-		case <-quit:
-			log.Info("Quit signal recieved.  Returning")
-			return
-		case <-retry:
-			start := time.Now()
-
-			err := toExecute(quit)
-			if err == nil {
-				return
-			}
-
-			if _, ok := err.(quitSignalError); ok {
-				return
-			}
-
-			end := time.Now()
-			//error encountered, since we would have returned above otherwise
-			handleError(err)
-
-			/* TODO keep this around? Imagine an immediately erroring service,
-			 *  causing many sequential requests which could pollute logs
-			 */
-			//only backoff if the request took less than one second
-			if end.After(start.Add(time.Second)) {
-				backoff.Reset()
-				retry = time.After(0 * time.Millisecond)
-			} else {
-				retry = time.After(backoff.Duration())
-			}
-		}
-	}
-}
-
-func addHeaders(req *http.Request) {
-	req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
-	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
-	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
-	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
-}
-
-type changeServerError struct {
-	Code string `json:"code"`
-}
-
-type quitSignalError struct {
-}
-
-type expected200Error struct {
-}
-
-type authFailError struct {
-}
-
-func (an expected200Error) Error() string {
-	return "Did not recieve OK response"
-}
-
-func (a quitSignalError) Error() string {
-	return "Signal to quit encountered"
-}
-
-func (a changeServerError) Error() string {
-	return a.Code
-}
-
-func (a authFailError) Error() string {
-	return "Authorization failed"
-}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
deleted file mode 100644
index f7144a1..0000000
--- a/apigee_sync_test.go
+++ /dev/null
@@ -1,452 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
-	"github.com/apid/apid-core"
-	"github.com/apid/apid-core/util"
-	"github.com/apigee-labs/transicator/common"
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
-	"net/http"
-	"net/http/httptest"
-)
-
-var _ = Describe("Sync", func() {
-	Context("offline mode", func() {
-		var (
-			testInstanceID   = util.GenerateUUID()
-			testInstanceName = "offline-instance-name"
-			testClusterID    = "offline-cluster-id"
-			testLastSnapshot = "offline-last-snapshot"
-			testChangeMan    *dummyChangeManager
-		)
-
-		var _ = BeforeEach(func() {
-			config.Set(configDiagnosticMode, true)
-			config.Set(configApidClusterId, testClusterID)
-			_initPlugin(apid.AllServices())
-			apidSnapshotManager = &dummySnapshotManager{}
-			testChangeMan = &dummyChangeManager{
-				pollChangeWithBackoffChan: make(chan bool, 1),
-			}
-			apidChangeManager = testChangeMan
-			apidTokenManager = &dummyTokenManager{}
-			apidInfo = apidInstanceInfo{
-				InstanceID:   testInstanceID,
-				InstanceName: testInstanceName,
-				ClusterID:    testClusterID,
-				LastSnapshot: testLastSnapshot,
-			}
-			updateApidInstanceInfo()
-
-		})
-
-		var _ = AfterEach(func() {
-			config.Set(configDiagnosticMode, false)
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).NotTo(HaveOccurred())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).NotTo(HaveOccurred())
-				err = tx.Commit()
-				Expect(err).NotTo(HaveOccurred())
-
-			}
-			wipeDBAferTest = true
-			newInstanceID = true
-			isOfflineMode = false
-		})
-
-		It("offline mode should bootstrap from local DB", func(done Done) {
-
-			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
-				if s, ok := event.(*common.Snapshot); ok {
-					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
-					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
-					Expect(s.SnapshotInfo).Should(Equal(testLastSnapshot))
-					Expect(s.Tables).To(BeNil())
-					close(done)
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-
-		}, 3)
-
-	})
-
-	Context("online mode", func() {
-		var _ = BeforeEach(func() {
-			_initPlugin(apid.AllServices())
-			createManagers()
-		})
-
-		var _ = AfterEach(func() {
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).NotTo(HaveOccurred())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).NotTo(HaveOccurred())
-				err = tx.Commit()
-				Expect(err).NotTo(HaveOccurred())
-			}
-			wipeDBAferTest = true
-		})
-
-		const expectedDataScopeId1 = "dataScope1"
-		const expectedDataScopeId2 = "dataScope2"
-
-		var initializeContext = func() {
-			testRouter = apid.API().Router()
-			testServer = httptest.NewServer(testRouter)
-
-			// set up mock server
-			mockParms := MockParms{
-				ReliableAPI:  false,
-				ClusterID:    config.GetString(configApidClusterId),
-				TokenKey:     config.GetString(configConsumerKey),
-				TokenSecret:  config.GetString(configConsumerSecret),
-				Scope:        "ert452",
-				Organization: "att",
-				Environment:  "prod",
-			}
-			testMock = Mock(mockParms, testRouter)
-
-			config.Set(configProxyServerBaseURI, testServer.URL)
-			config.Set(configSnapServerBaseURI, testServer.URL)
-			config.Set(configChangeServerBaseURI, testServer.URL)
-		}
-
-		var restoreContext = func() {
-
-			testServer.Close()
-
-			config.Set(configProxyServerBaseURI, dummyConfigValue)
-			config.Set(configSnapServerBaseURI, dummyConfigValue)
-			config.Set(configChangeServerBaseURI, dummyConfigValue)
-
-		}
-
-		It("should succesfully bootstrap from clean slate", func(done Done) {
-			log.Info("Starting sync tests...")
-			var closeDone <-chan bool
-			initializeContext()
-			// do not wipe DB after.  Lets use it
-			wipeDBAferTest = false
-			var lastSnapshot *common.Snapshot
-
-			expectedSnapshotTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms_company"},
-					common.Change{Table: "edgex_apid_cluster"},
-					common.Change{Table: "edgex_data_scope"},
-					common.Change{Table: "kms_app_credential"},
-					common.Change{Table: "kms_app_credential_apiproduct_mapper"},
-					common.Change{Table: "kms_developer"},
-					common.Change{Table: "kms_company_developer"},
-					common.Change{Table: "kms_api_product"},
-					common.Change{Table: "kms_app"}},
-			}
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-				if s, ok := event.(*common.Snapshot); ok {
-
-					Expect(16).To(Equal(len(knownTables)))
-					Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
-
-					lastSnapshot = s
-
-					db, _ := dataService.DBVersion(s.SnapshotInfo)
-					var rowCount int
-					var id string
-
-					err := db.Ping()
-					Expect(err).NotTo(HaveOccurred())
-					numApidClusters, err := db.Query("select distinct count(*) from edgex_apid_cluster;")
-					if err != nil {
-						Fail("Failed to get correct DB")
-					}
-					Expect(true).To(Equal(numApidClusters.Next()))
-					numApidClusters.Scan(&rowCount)
-					Expect(1).To(Equal(rowCount))
-					numApidClusters.Close()
-					apidClusters, err := db.Query("select id from edgex_apid_cluster;")
-					Expect(err).NotTo(HaveOccurred())
-					apidClusters.Next()
-					apidClusters.Scan(&id)
-					Expect(id).To(Equal(expectedClusterId))
-					apidClusters.Close()
-
-					numDataScopes, err := db.Query("select distinct count(*) from edgex_data_scope;")
-					Expect(err).NotTo(HaveOccurred())
-					Expect(true).To(Equal(numDataScopes.Next()))
-					numDataScopes.Scan(&rowCount)
-					Expect(2).To(Equal(rowCount))
-					numDataScopes.Close()
-					dataScopes, err := db.Query("select id from edgex_data_scope;")
-					Expect(err).NotTo(HaveOccurred())
-					dataScopes.Next()
-					dataScopes.Scan(&id)
-					dataScopes.Next()
-
-					if id == expectedDataScopeId1 {
-						dataScopes.Scan(&id)
-						Expect(id).To(Equal(expectedDataScopeId2))
-					} else {
-						dataScopes.Scan(&id)
-						Expect(id).To(Equal(expectedDataScopeId1))
-					}
-					dataScopes.Close()
-
-				} else if cl, ok := event.(*common.ChangeList); ok {
-					closeDone = apidChangeManager.close()
-					// ensure that snapshot switched DB versions
-					Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
-					expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
-					Expect(err).NotTo(HaveOccurred())
-					Expect(getDB() == expectedDB).Should(BeTrue())
-
-					Expect(cl.Changes).To(HaveLen(6))
-
-					var tables []string
-					for _, c := range cl.Changes {
-						tables = append(tables, c.Table)
-						Expect(c.NewRow).ToNot(BeNil())
-
-						var tenantID string
-						c.NewRow.Get("tenant_id", &tenantID)
-						Expect(tenantID).To(Equal("ert452"))
-					}
-
-					Expect(tables).To(ContainElement("kms_app_credential"))
-					Expect(tables).To(ContainElement("kms_app_credential_apiproduct_mapper"))
-					Expect(tables).To(ContainElement("kms_developer"))
-					Expect(tables).To(ContainElement("kms_company_developer"))
-					Expect(tables).To(ContainElement("kms_api_product"))
-					Expect(tables).To(ContainElement("kms_app"))
-
-					go func() {
-						// when close done, all handlers for the first changeList have been executed
-						<-closeDone
-						defer GinkgoRecover()
-						// allow other handler to execute to insert last_sequence
-						var seq string
-						err = getDB().
-							QueryRow("SELECT last_sequence FROM EDGEX_APID_CLUSTER LIMIT 1;").
-							Scan(&seq)
-						Expect(err).NotTo(HaveOccurred())
-						//}
-						Expect(seq).To(Equal(cl.LastSequence))
-
-						restoreContext()
-						close(done)
-					}()
-
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-		}, 5)
-
-		It("should bootstrap from local DB if present", func(done Done) {
-
-			var closeDone <-chan bool
-
-			initializeContext()
-			expectedTables := common.ChangeList{
-				Changes: []common.Change{common.Change{Table: "kms_company"},
-					common.Change{Table: "edgex_apid_cluster"},
-					common.Change{Table: "edgex_data_scope"}},
-			}
-			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
-				if s, ok := event.(*common.Snapshot); ok {
-					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
-					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
-					closeDone = apidChangeManager.close()
-					go func() {
-						// when close done, all handlers for the first snapshot have been executed
-						<-closeDone
-						//verify that the knownTables array has been properly populated from existing DB
-						Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
-
-						Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
-						Expect(s.Tables).To(BeNil())
-
-						restoreContext()
-						close(done)
-					}()
-
-				}
-			})
-			pie := apid.PluginsInitializedEvent{
-				Description: "plugins initialized",
-			}
-			pie.Plugins = append(pie.Plugins, pluginData)
-			postInitPlugins(pie)
-
-		}, 3)
-
-		It("should detect apid_cluster_id change in config yaml", func() {
-			Expect(apidInfo).ToNot(BeNil())
-			Expect(apidInfo.ClusterID).To(Equal("bootstrap"))
-			Expect(apidInfo.InstanceID).ToNot(BeEmpty())
-			previousInstanceId := apidInfo.InstanceID
-
-			config.Set(configApidClusterId, "new value")
-			apidInfo, err := getApidInstanceInfo()
-			Expect(err).NotTo(HaveOccurred())
-			Expect(apidInfo.LastSnapshot).To(BeEmpty())
-			Expect(apidInfo.InstanceID).ToNot(BeEmpty())
-			Expect(apidInfo.InstanceID).ToNot(Equal(previousInstanceId))
-			Expect(apidInfo.ClusterID).To(Equal("new value"))
-		})
-
-		It("should correctly identify non-proper subsets with respect to maps", func() {
-
-			//test b proper subset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "b"}},
-			)).To(BeFalse())
-
-			//test a == b
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
-			)).To(BeFalse())
-
-			//test b superset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
-			)).To(BeTrue())
-
-			//test b not subset of a
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{common.Change{Table: "c"}},
-			)).To(BeTrue())
-
-			//test a empty
-			Expect(changesHaveNewTables(map[string]bool{},
-				[]common.Change{common.Change{Table: "a"}},
-			)).To(BeTrue())
-
-			//test b empty
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
-				[]common.Change{},
-			)).To(BeFalse())
-
-			//test b nil
-			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
-
-			//test a nil
-			Expect(changesHaveNewTables(nil,
-				[]common.Change{common.Change{Table: "a"}},
-			)).To(BeTrue())
-		}, 3)
-
-		// todo: disabled for now -
-		// there is precondition I haven't been able to track down that breaks this test on occasion
-		XIt("should process a new snapshot when change server requires it", func(done Done) {
-			oldSnap := apidInfo.LastSnapshot
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-				defer GinkgoRecover()
-
-				if s, ok := event.(*common.Snapshot); ok {
-					Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
-					close(done)
-				}
-			})
-			testMock.forceNewSnapshot()
-		})
-
-		It("Verify the Sequence Number Logic works as expected", func() {
-			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
-			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
-			Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
-			Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
-			Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
-			Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
-		}, 3)
-
-		/*
-		 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
-		 */
-		It("Should be able to handle duplicate snapshot during bootstrap", func() {
-			initializeContext()
-			apidTokenManager = createSimpleTokenManager()
-			apidTokenManager.start()
-			apidSnapshotManager = createSnapShotManager()
-			//events.Listen(ApigeeSyncEventSelector, &handler{})
-
-			scopes := []string{apidInfo.ClusterID}
-			snapshot := &common.Snapshot{}
-			apidSnapshotManager.downloadSnapshot(true, scopes, snapshot)
-			apidSnapshotManager.storeBootSnapshot(snapshot)
-			apidSnapshotManager.storeDataSnapshot(snapshot)
-			restoreContext()
-			<-apidSnapshotManager.close()
-			apidTokenManager.close()
-		}, 3)
-
-		It("Reuse http.Client connection for multiple concurrent requests", func() {
-			var tr *http.Transport
-			server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-			}))
-			tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
-			tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
-			var rspcnt int = 0
-			ch := make(chan *http.Response)
-			client := &http.Client{Transport: tr}
-			for i := 0; i < 2*maxIdleConnsPerHost; i++ {
-				go func(client *http.Client) {
-					req, err := http.NewRequest("GET", server.URL, nil)
-					resp, err := client.Do(req)
-					if err != nil {
-						Fail("Unable to process Client request")
-					}
-					ch <- resp
-					resp.Body.Close()
-
-				}(client)
-			}
-			for {
-				select {
-				case resp := <-ch:
-					Expect(resp.StatusCode).To(Equal(http.StatusOK))
-					if rspcnt >= 2*maxIdleConnsPerHost-1 {
-						return
-					}
-					rspcnt++
-				default:
-				}
-			}
-
-		}, 3)
-
-	})
-})
diff --git a/backoff.go b/backoff.go
deleted file mode 100644
index bad8077..0000000
--- a/backoff.go
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2017 Google Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package apidApigeeSync
-
-import (
-	"math"
-	"math/rand"
-	"time"
-)
-
-const defaultInitial time.Duration = 200 * time.Millisecond
-const defaultMax time.Duration = 10 * time.Second
-const defaultFactor float64 = 2
-
-type Backoff struct {
-	attempt         int
-	initial, max    time.Duration
-	jitter          bool
-	backoffStrategy func() time.Duration
-}
-
-type ExponentialBackoff struct {
-	Backoff
-	factor float64
-}
-
-func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
-	backoff := &ExponentialBackoff{}
-
-	if initial <= 0 {
-		initial = defaultInitial
-	}
-	if max <= 0 {
-		max = defaultMax
-	}
-
-	if factor <= 0 {
-		factor = defaultFactor
-	}
-
-	backoff.initial = initial
-	backoff.max = max
-	backoff.attempt = 0
-	backoff.factor = factor
-	backoff.jitter = jitter
-	backoff.backoffStrategy = backoff.exponentialBackoffStrategy
-
-	return backoff
-}
-
-func (b *Backoff) Duration() time.Duration {
-	d := b.backoffStrategy()
-	b.attempt++
-	return d
-}
-
-func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
-
-	initial := float64(b.Backoff.initial)
-	attempt := float64(b.Backoff.attempt)
-	duration := initial * math.Pow(b.factor, attempt)
-
-	if duration > math.MaxInt64 {
-		return b.max
-	}
-	dur := time.Duration(duration)
-
-	if b.jitter {
-		duration = (rand.Float64()*(duration-initial) + initial)
-	}
-
-	if dur > b.max {
-		return b.max
-	}
-
-	log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
-	return dur
-}
-
-func (b *Backoff) Reset() {
-	b.attempt = 0
-}
-
-func (b *Backoff) Attempt() int {
-	return b.attempt
-}
diff --git a/change_test.go b/change_test.go
index 511eb82..97d5fb3 100644
--- a/change_test.go
+++ b/change_test.go
@@ -16,144 +16,196 @@
 
 import (
 	"github.com/apid/apid-core"
+	"github.com/apid/apid-core/api"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"net/http"
 	"net/http/httptest"
-	"os"
+	"strconv"
 	"time"
 )
 
+const (
+	expectedInstanceId = "dummy"
+)
+
 var _ = Describe("Change Agent", func() {
 
 	Context("Change Agent Unit Tests", func() {
 
-		var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
-			initDb(sqlfile, "./mockdb_change.sqlite3")
-			file, err := os.Open("./mockdb_change.sqlite3")
-			Expect(err).Should(Succeed())
-			s := common.Snapshot{}
-			err = processSnapshotServerFileResponse(dbId, file, &s)
-			Expect(err).Should(Succeed())
-			return s
-		}
+		Context("utils", func() {
 
-		var initializeContext = func() {
-			testRouter = apid.API().Router()
-			testServer = httptest.NewServer(testRouter)
+			It("should correctly identify non-proper subsets with respect to maps", func() {
+				//test b proper subset of a
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+					[]common.Change{{Table: "b"}},
+				)).To(BeFalse())
 
-			// set up mock server
-			mockParms := MockParms{
-				ReliableAPI:  true,
-				ClusterID:    config.GetString(configApidClusterId),
-				TokenKey:     config.GetString(configConsumerKey),
-				TokenSecret:  config.GetString(configConsumerSecret),
-				Scope:        "ert452",
-				Organization: "att",
-				Environment:  "prod",
-			}
-			testMock = Mock(mockParms, testRouter)
+				//test a == b
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+					[]common.Change{{Table: "a"}, {Table: "b"}},
+				)).To(BeFalse())
 
-			config.Set(configProxyServerBaseURI, testServer.URL)
-			config.Set(configSnapServerBaseURI, testServer.URL)
-			config.Set(configChangeServerBaseURI, testServer.URL)
-			config.Set(configPollInterval, 1*time.Millisecond)
-		}
+				//test b superset of a
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+					[]common.Change{{Table: "a"}, {Table: "b"}, {Table: "c"}},
+				)).To(BeTrue())
 
-		var restoreContext = func() {
+				//test b not subset of a
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+					[]common.Change{{Table: "c"}},
+				)).To(BeTrue())
 
-			testServer.Close()
-			config.Set(configProxyServerBaseURI, dummyConfigValue)
-			config.Set(configSnapServerBaseURI, dummyConfigValue)
-			config.Set(configChangeServerBaseURI, dummyConfigValue)
-			config.Set(configPollInterval, 10*time.Millisecond)
-		}
+				//test a empty
+				Expect(changesHaveNewTables(map[string]bool{},
+					[]common.Change{{Table: "a"}},
+				)).To(BeTrue())
 
-		var _ = BeforeEach(func() {
-			_initPlugin(apid.AllServices())
-			createManagers()
-			event := createTestDb("./sql/init_mock_db.sql", "test_change")
-			processSnapshot(&event)
-			knownTables = extractTablesFromDB(getDB())
-		})
+				//test b empty
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+					[]common.Change{},
+				)).To(BeFalse())
 
-		var _ = AfterEach(func() {
-			restoreContext()
-			if wipeDBAferTest {
-				db, err := dataService.DB()
-				Expect(err).Should(Succeed())
-				tx, err := db.Begin()
-				_, err = tx.Exec("DELETE FROM APID")
-				Expect(err).Should(Succeed())
-				err = tx.Commit()
-				Expect(err).Should(Succeed())
-			}
-			wipeDBAferTest = true
-		})
+				//test b nil
+				Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeFalse())
 
-		It("test change agent with authorization failure", func() {
-			log.Debug("test change agent with authorization failure")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			apidSnapshotManager = &dummySnapshotManager{}
-			initializeContext()
-			testMock.forceAuthFail()
-			wipeDBAferTest = true
-			apidChangeManager.pollChangeWithBackoff()
-			// auth check fails
-			<-testTokenManager.invalidateChan
-			log.Debug("closing")
-			<-apidChangeManager.close()
-		})
-
-		It("test change agent with too old snapshot", func() {
-			log.Debug("test change agent with too old snapshot")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			testSnapshotManager := &dummySnapshotManager{make(chan bool)}
-			apidSnapshotManager = testSnapshotManager
-			initializeContext()
-
-			testMock.passAuthCheck()
-			testMock.forceNewSnapshot()
-			wipeDBAferTest = true
-			apidChangeManager.pollChangeWithBackoff()
-			<-testSnapshotManager.downloadCalledChan
-			log.Debug("closing")
-			<-apidChangeManager.close()
-		})
-
-		It("change agent should retry with authorization failure", func(done Done) {
-			log.Debug("change agent should retry with authorization failure")
-			testTokenManager := &dummyTokenManager{make(chan bool)}
-			apidTokenManager = testTokenManager
-			apidTokenManager.start()
-			apidSnapshotManager = &dummySnapshotManager{}
-			initializeContext()
-			testMock.forceAuthFail()
-			testMock.forceNoSnapshot()
-			wipeDBAferTest = true
-
-			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-
-				if _, ok := event.(*common.ChangeList); ok {
-					closeDone := apidChangeManager.close()
-					log.Debug("closing")
-					go func() {
-						// when close done, all handlers for the first snapshot have been executed
-						<-closeDone
-						close(done)
-					}()
-
-				}
+				//test a nil
+				Expect(changesHaveNewTables(nil,
+					[]common.Change{{Table: "a"}},
+				)).To(BeTrue())
 			})
 
-			apidChangeManager.pollChangeWithBackoff()
-			// auth check fails
-			<-testTokenManager.invalidateChan
-		}, 2)
+			It("Compare Sequence Number", func() {
+				Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+				Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+				Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+				Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+				Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+				Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+			})
 
+		})
+
+		Context("changeManager", func() {
+			testCount := 0
+			var testChangeMan *pollChangeManager
+			var dummyDbMan *dummyDbManager
+			var dummySnapMan *dummySnapshotManager
+			var dummyTokenMan *dummyTokenManager
+			var testServer *httptest.Server
+			var testRouter apid.Router
+			var testMock *MockServer
+			BeforeEach(func() {
+				testCount++
+				dummyDbMan = &dummyDbManager{
+					knownTables: map[string]bool{
+						"_transicator_metadata":                true,
+						"_transicator_tables":                  true,
+						"attributes":                           true,
+						"edgex_apid_cluster":                   true,
+						"edgex_data_scope":                     true,
+						"kms_api_product":                      true,
+						"kms_app":                              true,
+						"kms_app_credential":                   true,
+						"kms_app_credential_apiproduct_mapper": true,
+						"kms_company":                          true,
+						"kms_company_developer":                true,
+						"kms_deployment":                       true,
+						"kms_developer":                        true,
+						"kms_organization":                     true,
+					},
+					scopes:         []string{"43aef41d"},
+					lastSeqUpdated: make(chan string, 1),
+				}
+				dummySnapMan = &dummySnapshotManager{
+					downloadCalledChan: make(chan bool, 1),
+				}
+				dummyTokenMan = &dummyTokenManager{
+					invalidateChan: make(chan bool, 1),
+				}
+				client := &http.Client{}
+				testChangeMan = createChangeManager(dummyDbMan, dummySnapMan, dummyTokenMan, client)
+				testChangeMan.block = 0
+
+				// create a new API service to have a new router for testing
+				testRouter = api.CreateService().Router()
+				testServer = httptest.NewServer(testRouter)
+				// set up mock server
+				mockParms := MockParms{
+					ReliableAPI:  true,
+					ClusterID:    config.GetString(configApidClusterId),
+					TokenKey:     config.GetString(configConsumerKey),
+					TokenSecret:  config.GetString(configConsumerSecret),
+					Scope:        "",
+					Organization: "att",
+					Environment:  "prod",
+				}
+				apidInfo.ClusterID = expectedClusterId
+				apidInfo.InstanceID = expectedInstanceId
+				testMock = Mock(mockParms, testRouter)
+				config.Set(configProxyServerBaseURI, testServer.URL)
+				config.Set(configSnapServerBaseURI, testServer.URL)
+				config.Set(configChangeServerBaseURI, testServer.URL)
+				config.Set(configPollInterval, 1*time.Millisecond)
+
+				initialBackoffInterval = time.Millisecond
+				testMock.oauthToken = "test_token_" + strconv.Itoa(testCount)
+				dummyTokenMan.token = testMock.oauthToken
+
+			})
+
+			AfterEach(func() {
+				testServer.Close()
+				<-testChangeMan.close()
+				config.Set(configProxyServerBaseURI, dummyConfigValue)
+				config.Set(configSnapServerBaseURI, dummyConfigValue)
+				config.Set(configChangeServerBaseURI, dummyConfigValue)
+				config.Set(configPollInterval, 10*time.Millisecond)
+			})
+
+			It("test change agent with authorization failure", func() {
+				log.Debug("test change agent with authorization failure")
+				testMock.forceAuthFailOnce()
+				testChangeMan.pollChangeWithBackoff()
+				// auth check fails
+				<-dummyTokenMan.invalidateChan
+				log.Debug("closing")
+			})
+
+			It("test change agent with too old snapshot", func() {
+				log.Debug("test change agent with too old snapshot")
+				testMock.passAuthCheck()
+				testMock.forceNewSnapshot()
+				testChangeMan.pollChangeWithBackoff()
+				<-dummySnapMan.downloadCalledChan
+				log.Debug("closing")
+			})
+
+			It("change agent should retry with authorization failure", func() {
+				log.Debug("change agent should retry with authorization failure")
+				testMock.forceAuthFailOnce()
+				testMock.forceNoSnapshot()
+				called := false
+				eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+					if _, ok := event.(*common.ChangeList); ok {
+						called = true
+					}
+				})
+				testChangeMan.pollChangeWithBackoff()
+				<-dummyTokenMan.invalidateChan
+				Expect(<-dummyDbMan.lastSeqUpdated).Should(Equal(testMock.lastSequenceID()))
+				Expect(called).Should(BeTrue())
+			}, 3)
+
+		})
+
+		Context("offline change manager", func() {
+			It("offline change manager should have no effect", func() {
+				o := &offlineChangeManager{}
+				o.pollChangeWithBackoff()
+				<-o.close()
+			})
+		})
 	})
 })
diff --git a/changes.go b/changes.go
index b78a1d1..6afa827 100644
--- a/changes.go
+++ b/changes.go
@@ -16,34 +16,44 @@
 
 import (
 	"encoding/json"
+	"fmt"
 	"github.com/apigee-labs/transicator/common"
 	"io/ioutil"
 	"net/http"
 	"net/url"
 	"path"
 	"sort"
+	"strconv"
 	"sync/atomic"
 	"time"
 )
 
-var lastSequence string
-var block string = "45"
-
 type pollChangeManager struct {
 	// 0 for not closed, 1 for closed
 	isClosed *int32
 	// 0 for pollChangeWithBackoff() not launched, 1 for launched
-	isLaunched *int32
-	quitChan   chan bool
+	isLaunched   *int32
+	quitChan     chan bool
+	block        int
+	lastSequence string
+	dbMan        DbManager
+	snapMan      snapshotManager
+	tokenMan     tokenManager
+	client       *http.Client
 }
 
-func createChangeManager() *pollChangeManager {
+func createChangeManager(dbMan DbManager, snapMan snapshotManager, tokenMan tokenManager, client *http.Client) *pollChangeManager {
 	isClosedInt := int32(0)
 	isLaunchedInt := int32(0)
 	return &pollChangeManager{
 		isClosed:   &isClosedInt,
 		quitChan:   make(chan bool),
 		isLaunched: &isLaunchedInt,
+		block:      45,
+		dbMan:      dbMan,
+		snapMan:    snapMan,
+		tokenMan:   tokenMan,
+		client:     client,
 	}
 }
 
@@ -56,7 +66,7 @@
 	finishChan := make(chan bool, 1)
 	//has been closed
 	if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
-		log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
+		log.Warn("pollChangeManager: close() called on a closed pollChangeManager!")
 		go func() {
 			log.Debug("change manager closed")
 			finishChan <- false
@@ -65,11 +75,10 @@
 	}
 	// not launched
 	if atomic.LoadInt32(c.isLaunched) == int32(0) {
-		log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
+		log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched!")
 		go func() {
-			c.quitChan <- true
-			apidTokenManager.close()
-			<-apidSnapshotManager.close()
+			c.tokenMan.close()
+			<-c.snapMan.close()
 			log.Debug("change manager closed")
 			finishChan <- false
 		}()
@@ -79,8 +88,8 @@
 	log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
 	go func() {
 		c.quitChan <- true
-		apidTokenManager.close()
-		<-apidSnapshotManager.close()
+		c.tokenMan.close()
+		<-c.snapMan.close()
 		log.Debug("change manager closed")
 		finishChan <- true
 	}()
@@ -120,43 +129,147 @@
 	 * Check to see if we have lastSequence already saved in the DB,
 	 * in which case, it has to be used to prevent re-reading same data
 	 */
-	lastSequence = getLastSequence()
-
+	c.lastSequence = c.dbMan.getLastSequence()
 	for {
 		select {
 		case <-c.quitChan:
 			log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
-			return quitSignalError{}
+			return quitSignalError
 		default:
-			err := c.getChanges(changesUri)
+			scopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
 			if err != nil {
-				if _, ok := err.(quitSignalError); ok {
-					log.Debug("pollChangeAgent: consuming the quit signal")
-					<-c.quitChan
-				}
+				return err
+			}
+			r, err := c.getChanges(scopes, changesUri)
+			if err != nil {
+				return err
+			}
+			cl, err := c.parseChangeResp(r)
+			if err != nil {
+				return err
+			}
+			if err = c.emitChangeList(scopes, cl); err != nil {
 				return err
 			}
 		}
 	}
 }
 
-//TODO refactor this method more, split it up
-/* Make a single request to the changeserver to get a changelist */
-func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
-	// if closed
-	if atomic.LoadInt32(c.isClosed) == int32(1) {
-		return quitSignalError{}
+func (c *pollChangeManager) parseChangeResp(r *http.Response) (*common.ChangeList, error) {
+	var err error
+	defer r.Body.Close()
+
+	if r.StatusCode != http.StatusOK {
+		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
+		switch r.StatusCode {
+		case http.StatusUnauthorized:
+			c.tokenMan.invalidateToken()
+			return nil, authFailError
+
+		case http.StatusNotModified:
+			return nil, nil
+		case http.StatusBadRequest:
+			var apiErr changeServerError
+			var b []byte
+			b, err = ioutil.ReadAll(r.Body)
+			if err != nil {
+				log.Errorf("Unable to read response body: %v", err)
+				return nil, err
+			}
+			err = json.Unmarshal(b, &apiErr)
+			if err != nil {
+				log.Errorf("JSON Response Data not parsable: %s", string(b))
+				return nil, err
+			}
+			if apiErr.Code == "SNAPSHOT_TOO_OLD" {
+				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
+				err = apiErr
+			}
+			return nil, err
+		default:
+			log.Errorf("Unknown response code from change server: %v", r.Status)
+			return nil, fmt.Errorf("unknown response code from change server: %v", r.Status)
+		}
 	}
+
+	resp := &common.ChangeList{}
+	err = json.NewDecoder(r.Body).Decode(resp)
+	if err != nil {
+		log.Errorf("JSON Response Data not parsable: %v", err)
+		return nil, err
+	}
+	return resp, nil
+}
+
+func (c *pollChangeManager) emitChangeList(scopes []string, cl *common.ChangeList) error {
+	var err error
+	/*
+	 * If the lastSequence is already newer or the same than what we got via
+	 * cl.LastSequence, Ignore it.
+	 */
+	if c.lastSequence != "" &&
+		getChangeStatus(c.lastSequence, cl.LastSequence) != 1 {
+		return nil
+	}
+
+	if changesRequireDDLSync(c.dbMan.getKnowTables(), cl) {
+		return changeServerError{
+			Code: "DDL changes detected; must get new snapshot",
+		}
+	}
+
+	/* If valid data present, Emit to plugins */
+	if len(cl.Changes) > 0 {
+		if err = c.dbMan.processChangeList(cl); err != nil {
+			log.Errorf("Error in processChangeList: %v", err)
+			return err
+		}
+		/*
+		* Check to see if there was any change in scope. If found, handle it
+		* by getting a new snapshot
+		 */
+		newScopes, err := c.dbMan.findScopesForId(apidInfo.ClusterID)
+		if err != nil {
+			return err
+		}
+		cs := scopeChanged(newScopes, scopes)
+		if cs != nil {
+			return cs
+		}
+		select {
+		case <-time.After(httpTimeout):
+			log.Panic("Timeout. Plugins failed to respond to changes.")
+		case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+		}
+	} else if c.lastSequence == "" { // emit the first changelist anyway
+		select {
+		case <-time.After(httpTimeout):
+			log.Panic("Timeout. Plugins failed to respond to changes.")
+		case <-eventService.Emit(ApigeeSyncEventSelector, cl):
+		}
+	} else {
+		log.Debugf("No Changes detected")
+	}
+
+	err = c.dbMan.updateLastSequence(cl.LastSequence)
+	if err != nil {
+		log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
+	}
+	c.lastSequence = cl.LastSequence
+	return nil
+}
+
+/* Make a single request to the changeserver to get a changelist */
+func (c *pollChangeManager) getChanges(scopes []string, changesUri *url.URL) (*http.Response, error) {
 	log.Debug("polling...")
 
 	/* Find the scopes associated with the config id */
-	scopes := findScopesForId(apidInfo.ClusterID)
 	v := url.Values{}
 
-	blockValue := block
+	blockValue := strconv.Itoa(c.block)
 	/* Sequence added to the query if available */
-	if lastSequence != "" {
-		v.Add("since", lastSequence)
+	if c.lastSequence != "" {
+		v.Add("since", c.lastSequence)
 	} else {
 		blockValue = "0"
 	}
@@ -178,115 +291,16 @@
 
 	/* If error, break the loop, and retry after interval */
 	req, err := http.NewRequest("GET", uri, nil)
-	addHeaders(req)
-	r, err := httpclient.Do(req)
+	addHeaders(req, c.tokenMan.getBearerToken())
+	r, err := c.client.Do(req)
 	if err != nil {
 		log.Errorf("change agent comm error: %s", err)
-		// if closed
-		if atomic.LoadInt32(c.isClosed) == int32(1) {
-			return quitSignalError{}
-		}
-		return err
+		return nil, err
 	}
-	defer r.Body.Close()
-
-	// has been closed
-	if atomic.LoadInt32(c.isClosed) == int32(1) {
-		log.Debugf("getChanges: changeManager has been closed")
-		return quitSignalError{}
-	}
-
-	if r.StatusCode != http.StatusOK {
-		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
-		switch r.StatusCode {
-		case http.StatusUnauthorized:
-			err = apidTokenManager.invalidateToken()
-			if err != nil {
-				return err
-			}
-			return authFailError{}
-
-		case http.StatusNotModified:
-			return nil
-
-		case http.StatusBadRequest:
-			var apiErr changeServerError
-			var b []byte
-			b, err = ioutil.ReadAll(r.Body)
-			if err != nil {
-				log.Errorf("Unable to read response body: %v", err)
-				return err
-			}
-			err = json.Unmarshal(b, &apiErr)
-			if err != nil {
-				log.Errorf("JSON Response Data not parsable: %s", string(b))
-				return err
-			}
-			if apiErr.Code == "SNAPSHOT_TOO_OLD" {
-				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
-				err = apiErr
-			}
-			return err
-		}
-		return nil
-	}
-
-	var resp common.ChangeList
-	err = json.NewDecoder(r.Body).Decode(&resp)
-	if err != nil {
-		log.Errorf("JSON Response Data not parsable: %v", err)
-		return err
-	}
-
-	/*
-	 * If the lastSequence is already newer or the same than what we got via
-	 * resp.LastSequence, Ignore it.
-	 */
-	if lastSequence != "" &&
-		getChangeStatus(lastSequence, resp.LastSequence) != 1 {
-		return nil
-	}
-
-	if changesRequireDDLSync(resp) {
-		return changeServerError{
-			Code: "DDL changes detected; must get new snapshot",
-		}
-	}
-
-	/* If valid data present, Emit to plugins */
-	if len(resp.Changes) > 0 {
-		processChangeList(&resp)
-		select {
-		case <-time.After(httpTimeout):
-			log.Panic("Timeout. Plugins failed to respond to changes.")
-		case <-events.Emit(ApigeeSyncEventSelector, &resp):
-		}
-	} else if lastSequence == "" {
-		select {
-		case <-time.After(httpTimeout):
-			log.Panic("Timeout. Plugins failed to respond to changes.")
-		case <-events.Emit(ApigeeSyncEventSelector, &resp):
-		}
-	} else {
-		log.Debugf("No Changes detected for Scopes: %s", scopes)
-	}
-
-	updateSequence(resp.LastSequence)
-
-	/*
-	 * Check to see if there was any change in scope. If found, handle it
-	 * by getting a new snapshot
-	 */
-	newScopes := findScopesForId(apidInfo.ClusterID)
-	cs := scopeChanged(newScopes, scopes)
-	if cs != nil {
-		return cs
-	}
-
-	return nil
+	return r, nil
 }
 
-func changesRequireDDLSync(changes common.ChangeList) bool {
+func changesRequireDDLSync(knownTables map[string]bool, changes *common.ChangeList) bool {
 	return changesHaveNewTables(knownTables, changes.Changes)
 }
 
@@ -296,10 +310,12 @@
 		log.Debugf("handleChangeServerError: changeManager has been closed")
 		return
 	}
-	if c, ok := err.(changeServerError); ok {
-		log.Debugf("%s. Fetch a new snapshot to sync...", c.Code)
-		apidSnapshotManager.downloadDataSnapshot()
-	} else {
+
+	switch e := err.(type) {
+	case changeServerError:
+		log.Debugf("%s. Fetch a new snapshot to sync...", e.Code)
+		c.snapMan.downloadDataSnapshot()
+	default:
 		log.Debugf("Error connecting to changeserver: %v", err)
 	}
 }
@@ -310,7 +326,7 @@
 func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
 
 	//nil maps should not be passed in.  Making the distinction between nil map and empty map
-	if a == nil {
+	if len(a) == 0 {
 		log.Warn("Nil map passed to function changesHaveNewTables, may be bug")
 		return true
 	}
@@ -332,24 +348,15 @@
 func getChangeStatus(lastSeq string, currSeq string) int {
 	seqPrev, err := common.ParseSequence(lastSeq)
 	if err != nil {
-		log.Panic("Unable to parse previous sequence string")
+		log.Panicf("Unable to parse previous sequence string: %v", err)
 	}
 	seqCurr, err := common.ParseSequence(currSeq)
 	if err != nil {
-		log.Panic("Unable to parse current sequence string")
+		log.Panicf("Unable to parse current sequence string: %v", err)
 	}
 	return seqCurr.Compare(seqPrev)
 }
 
-func updateSequence(seq string) {
-	lastSequence = seq
-	err := updateLastSequence(seq)
-	if err != nil {
-		log.Panicf("Unable to update Sequence in DB. Err {%v}", err)
-	}
-
-}
-
 /*
  * Returns nil if the two arrays have matching contents
  */
diff --git a/cmd/mockServer/main.go b/cmd/mockServer/main.go
index 486da41..8cfa1ef 100644
--- a/cmd/mockServer/main.go
+++ b/cmd/mockServer/main.go
@@ -19,7 +19,6 @@
 
 	"os"
 
-
 	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/factory"
 	"github.com/apid/apidApigeeSync"
@@ -53,16 +52,16 @@
 	router := apid.API().Router()
 
 	params := apidApigeeSync.MockParms{
-		ReliableAPI:            *reliable,
-		ClusterID:              "cluster",
-		TokenKey:               "key",
-		TokenSecret:            "secret",
-		Scope:                  "scope",
-		Organization:           "org",
-		Environment:            "test",
-		NumDevelopers:          *numDevs,
-		NumDeployments:         *numDeps,
-		BundleURI:              *bundleURI,
+		ReliableAPI:    *reliable,
+		ClusterID:      "cluster",
+		TokenKey:       "key",
+		TokenSecret:    "secret",
+		Scope:          "scope",
+		Organization:   "org",
+		Environment:    "test",
+		NumDevelopers:  *numDevs,
+		NumDeployments: *numDeps,
+		BundleURI:      *bundleURI,
 	}
 
 	log.Printf("Params: %#v\n", params)
diff --git a/data.go b/data.go
index c77fc10..be8dd34 100644
--- a/data.go
+++ b/data.go
@@ -28,27 +28,36 @@
 )
 
 var (
-	unsafeDB apid.DB
-	dbMux    sync.RWMutex
+	dbMux sync.RWMutex
 )
 
-type dataApidCluster struct {
-	ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string
-	Updated, Created                                        string
-}
-
-type dataDataScope struct {
-	ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string
-	Updated, Created                                     string
-}
-
 /*
 This plugin uses 2 databases:
 1. The default DB is used for APID table.
 2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE
 (Currently, the snapshot never changes, but this is future-proof)
 */
-func initDB(db apid.DB) error {
+
+func creatDbManager() *dbManager {
+	return &dbManager{
+		DbMux:       &sync.RWMutex{},
+		knownTables: make(map[string]bool),
+	}
+}
+
+type dbManager struct {
+	Db          apid.DB
+	DbMux       *sync.RWMutex
+	dbVersion   string
+	knownTables map[string]bool
+}
+
+// idempotent call to initialize default DB
+func (dbMan *dbManager) initDB() error {
+	db, err := dataService.DB()
+	if err != nil {
+		return err
+	}
 	tx, err := db.Begin()
 	if err != nil {
 		log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
@@ -75,24 +84,22 @@
 	return nil
 }
 
-func getDB() apid.DB {
+func (dbMan *dbManager) getDB() apid.DB {
 	dbMux.RLock()
-	db := unsafeDB
-	dbMux.RUnlock()
-	return db
+	defer dbMux.RUnlock()
+	return dbMan.Db
 }
 
-func setDB(db apid.DB) {
+func (dbMan *dbManager) setDB(db apid.DB) {
 	dbMux.Lock()
-	unsafeDB = db
-	dbMux.Unlock()
+	defer dbMux.Unlock()
+	dbMan.Db = db
 }
 
 //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
-func insert(tableName string, rows []common.Row, txn apid.Tx) bool {
-
+func (dbMan *dbManager) insert(tableName string, rows []common.Row, txn apid.Tx) error {
 	if len(rows) == 0 {
-		return false
+		return fmt.Errorf("no rows")
 	}
 
 	var orderedColumns []string
@@ -101,12 +108,12 @@
 	}
 	sort.Strings(orderedColumns)
 
-	sql := buildInsertSql(tableName, orderedColumns, rows)
+	sql := dbMan.buildInsertSql(tableName, orderedColumns, rows)
 
 	prep, err := txn.Prepare(sql)
 	if err != nil {
-		log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
-		return false
+		log.Errorf("INSERT Fail to prepare statement %s error=%v", sql, err)
+		return err
 	}
 	defer prep.Close()
 
@@ -123,15 +130,15 @@
 	_, err = prep.Exec(values...)
 
 	if err != nil {
-		log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
-		return false
+		log.Errorf("INSERT Fail %s values=%v error=%v", sql, values, err)
+		return err
 	}
-	log.Debugf("INSERT Success [%s] values=%v", sql, values)
+	log.Debugf("INSERT Success %s values=%v", sql, values)
 
-	return true
+	return nil
 }
 
-func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
+func (dbMan *dbManager) getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
 	var values = make([]interface{}, len(pkeys))
 	for i, pkey := range pkeys {
 		if row[pkey] == nil {
@@ -143,52 +150,46 @@
 	return values
 }
 
-func _delete(tableName string, rows []common.Row, txn apid.Tx) bool {
-	pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) delete(tableName string, rows []common.Row, txn apid.Tx) error {
+	pkeys, err := dbMan.getPkeysForTable(tableName)
 	sort.Strings(pkeys)
 	if len(pkeys) == 0 || err != nil {
-		log.Errorf("DELETE No primary keys found for table. %s", tableName)
-		return false
+		return fmt.Errorf("DELETE No primary keys found for table. %s", tableName)
 	}
 
 	if len(rows) == 0 {
-		log.Errorf("No rows found for table.", tableName)
-		return false
+		return fmt.Errorf("no rows found for table %s", tableName)
 	}
 
-	sql := buildDeleteSql(tableName, rows[0], pkeys)
+	sql := dbMan.buildDeleteSql(tableName, rows[0], pkeys)
 	prep, err := txn.Prepare(sql)
 	if err != nil {
-		log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
-		return false
+		return fmt.Errorf("DELETE Fail to prep statement %s error=%v", sql, err)
 	}
 	defer prep.Close()
 	for _, row := range rows {
-		values := getValueListFromKeys(row, pkeys)
+		values := dbMan.getValueListFromKeys(row, pkeys)
 		// delete prepared statement from existing template statement
 		res, err := txn.Stmt(prep).Exec(values...)
 		if err != nil {
-			log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("DELETE Fail %s values=%v error=%v", sql, values, err)
 		}
 		affected, err := res.RowsAffected()
 		if err == nil && affected != 0 {
-			log.Debugf("DELETE Success [%s] values=%v", sql, values)
+			log.Debugf("DELETE Success %s values=%v", sql, values)
 		} else if err == nil && affected == 0 {
-			log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
-			return false
+			return fmt.Errorf("entry not found %s values=%v, nothing to delete", sql, values)
 		} else {
-			log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("DELETE Failed %s values=%v error=%v", sql, values, err)
 		}
 
 	}
-	return true
+	return nil
 
 }
 
 // Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
-func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
 
 	var wherePlaceholders []string
 	i := 1
@@ -210,14 +211,13 @@
 
 }
 
-func update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) bool {
-	pkeys, err := getPkeysForTable(tableName)
+func (dbMan *dbManager) update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) error {
+	pkeys, err := dbMan.getPkeysForTable(tableName)
 	if len(pkeys) == 0 || err != nil {
-		log.Errorf("UPDATE No primary keys found for table.", tableName)
-		return false
+		return fmt.Errorf("UPDATE No primary keys found for table: %v, %v", tableName, err)
 	}
 	if len(oldRows) == 0 || len(newRows) == 0 {
-		return false
+		return fmt.Errorf("UPDATE No old or new rows, table: %v, %v, %v", tableName, oldRows, newRows)
 	}
 
 	var orderedColumns []string
@@ -229,11 +229,10 @@
 	sort.Strings(orderedColumns)
 
 	//build update statement, use arbitrary row as template
-	sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
+	sql := dbMan.buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
 	prep, err := txn.Prepare(sql)
 	if err != nil {
-		log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
-		return false
+		return fmt.Errorf("UPDATE Fail to prep statement %s error=%v", sql, err)
 	}
 	defer prep.Close()
 
@@ -265,25 +264,23 @@
 		res, err := txn.Stmt(prep).Exec(values...)
 
 		if err != nil {
-			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
 		}
 		numRowsAffected, err := res.RowsAffected()
 		if err != nil {
-			log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
-			return false
+			return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
 		}
 		//delete this once we figure out why tests are failing/not updating
 		log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
-		log.Debugf("UPDATE Success [%s] values=%v", sql, values)
+		log.Debugf("UPDATE Success %s values=%v", sql, values)
 
 	}
 
-	return true
+	return nil
 
 }
 
-func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
+func (dbMan *dbManager) buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
 	if row == nil {
 		return ""
 	}
@@ -311,7 +308,7 @@
 }
 
 //precondition: rows.length > 1000, max number of entities for sqlite
-func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
+func (dbMan *dbManager) buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
 	if len(rows) == 0 {
 		return ""
 	}
@@ -343,13 +340,13 @@
 	return sql
 }
 
-func getPkeysForTable(tableName string) ([]string, error) {
-	db := getDB()
+func (dbMan *dbManager) getPkeysForTable(tableName string) ([]string, error) {
+	db := dbMan.getDB()
 	normalizedTableName := normalizeTableName(tableName)
 	sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
 	rows, err := db.Query(sql, normalizedTableName)
 	if err != nil {
-		log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err)
+		log.Errorf("Failed %s values=%s Error: %v", sql, normalizedTableName, err)
 		return nil, err
 	}
 	var columnNames []string
@@ -358,13 +355,15 @@
 		var value string
 		err := rows.Scan(&value)
 		if err != nil {
-			log.Fatal(err)
+			log.Errorf("failed to scan column names: %v", err)
+			return nil, err
 		}
 		columnNames = append(columnNames, value)
 	}
 	err = rows.Err()
 	if err != nil {
-		log.Fatal(err)
+		log.Errorf("failed to scan column names: %v", err)
+		return nil, err
 	}
 	return columnNames, nil
 }
@@ -377,13 +376,11 @@
  * For the given apidConfigId, this function will retrieve all the distinch scopes
  * associated with it. Distinct, because scope is already a collection of the tenants.
  */
-func findScopesForId(configId string) (scopes []string) {
+func (dbMan *dbManager) findScopesForId(configId string) (scopes []string, err error) {
 
 	log.Debugf("findScopesForId: %s", configId)
-
 	var scope sql.NullString
-	db := getDB()
-
+	db := dbMan.getDB()
 	query := `
 		SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1
 		UNION
@@ -391,7 +388,6 @@
 		UNION
 		SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3
 	`
-
 	rows, err := db.Query(query, configId, configId, configId)
 	if err != nil {
 		log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
@@ -416,9 +412,9 @@
 /*
  * Retrieve SnapshotInfo for the given apidConfigId from apid_config table
  */
-func getLastSequence() (lastSequence string) {
+func (dbMan *dbManager) getLastSequence() (lastSequence string) {
 
-	err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
+	err := dbMan.getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
 	if err != nil && err != sql.ErrNoRows {
 		log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
 		return
@@ -432,11 +428,11 @@
  * Persist the last change Id each time a change has been successfully
  * processed by the plugin(s)
  */
-func updateLastSequence(lastSequence string) error {
+func (dbMan *dbManager) updateLastSequence(lastSequence string) error {
 
 	log.Debugf("updateLastSequence: %s", lastSequence)
 
-	tx, err := getDB().Begin()
+	tx, err := dbMan.getDB().Begin()
 	if err != nil {
 		log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
 		return err
@@ -454,10 +450,9 @@
 	return err
 }
 
-func getApidInstanceInfo() (info apidInstanceInfo, err error) {
+func (dbMan *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
 	info.InstanceName = config.GetString(configName)
 	info.ClusterID = config.GetString(configApidClusterId)
-
 	var savedClusterId string
 
 	// always use default database for this
@@ -481,7 +476,7 @@
 		} else {
 			// first start - no row, generate a UUID and store it
 			err = nil
-			newInstanceID = true
+			info.IsNewInstance = true
 			info.InstanceID = util.GenerateUUID()
 
 			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
@@ -489,13 +484,14 @@
 				info.InstanceID, info.ClusterID, "")
 		}
 	} else if savedClusterId != info.ClusterID {
-		log.Debug("Detected apid cluster id change in config.  Apid will start clean")
+		log.Warnf("Detected apid cluster id change in config. %v v.s. %v Apid will start clean.",
+			savedClusterId, info.ClusterID)
 		err = nil
-		newInstanceID = true
+		info.IsNewInstance = true
 		info.InstanceID = util.GenerateUUID()
 
-		_, err = tx.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
-			info.InstanceID, info.ClusterID, "")
+		_, err = tx.Exec("DELETE FROM APID;")
+
 		info.LastSnapshot = ""
 	}
 	if err = tx.Commit(); err != nil {
@@ -504,8 +500,8 @@
 	return
 }
 
-func updateApidInstanceInfo() error {
-
+func (dbMan *dbManager) updateApidInstanceInfo(instanceId, clusterId, lastSnap string) error {
+	log.Debugf("updateApidInstanceInfo: %v, %v, %v", instanceId, clusterId, lastSnap)
 	// always use default database for this
 	db, err := dataService.DB()
 	if err != nil {
@@ -521,7 +517,7 @@
 		REPLACE
 		INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
 		VALUES (?,?,?)`,
-		apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+		instanceId, clusterId, lastSnap)
 	if err != nil {
 		log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
 		return err
@@ -536,3 +532,131 @@
 
 	return err
 }
+
+func (dbMan *dbManager) extractTables() (map[string]bool, error) {
+	tables := make(map[string]bool)
+	db := dbMan.getDB()
+	rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+
+	for rows.Next() {
+		var table sql.NullString
+		if err := rows.Scan(&table); err != nil {
+			return nil, err
+		}
+		log.Debugf("Table %v found in existing db", table)
+		if table.Valid {
+			tables[table.String] = true
+		}
+	}
+	log.Debugf("Extracting table names from existing DB %v", tables)
+	return tables, nil
+}
+
+func (dbMan *dbManager) getKnowTables() map[string]bool {
+	return dbMan.knownTables
+}
+
+func (dbMan *dbManager) processChangeList(changes *common.ChangeList) error {
+
+	tx, err := dbMan.getDB().Begin()
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
+
+	for _, change := range changes.Changes {
+		if change.Table == LISTENER_TABLE_APID_CLUSTER {
+			return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+		}
+		switch change.Operation {
+		case common.Insert:
+			err = dbMan.insert(change.Table, []common.Row{change.NewRow}, tx)
+		case common.Update:
+			if change.Table == LISTENER_TABLE_DATA_SCOPE {
+				return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
+			}
+			err = dbMan.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
+		case common.Delete:
+			err = dbMan.delete(change.Table, []common.Row{change.OldRow}, tx)
+		}
+		if err != nil {
+			return err
+		}
+
+	}
+
+	if err = tx.Commit(); err != nil {
+		return fmt.Errorf("Commit error in processChangeList: %v", err)
+	}
+	return nil
+}
+
+func (dbMan *dbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+
+	var prevDb string
+	if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
+		log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
+			apidInfo.LastSnapshot, snapshot.SnapshotInfo)
+		prevDb = apidInfo.LastSnapshot
+	} else {
+		log.Debugf("Process snapshot for version {%s}",
+			snapshot.SnapshotInfo)
+	}
+	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+	if err != nil {
+		return fmt.Errorf("unable to access database: %v", err)
+	}
+
+	var numApidClusters int
+	tx, err := db.Begin()
+	if err != nil {
+		return fmt.Errorf("unable to open DB txn: {%v}", err.Error())
+	}
+	defer tx.Rollback()
+	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
+	if err != nil {
+		return fmt.Errorf("unable to read database: {%s}", err.Error())
+	}
+
+	if numApidClusters != 1 {
+		return fmt.Errorf("illegal state for apid_cluster, must be a single row")
+	}
+
+	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
+	if err != nil && err.Error() != "duplicate column name: last_sequence" {
+		return fmt.Errorf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
+	}
+
+	if err = tx.Commit(); err != nil {
+		return fmt.Errorf("error when commit in processSqliteSnapshot: %v", err)
+	}
+
+	//update apid instance info
+	apidInfo.LastSnapshot = snapshot.SnapshotInfo
+	err = dbMan.updateApidInstanceInfo(apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
+	if err != nil {
+		log.Errorf("Unable to update instance info: %v", err)
+		return fmt.Errorf("unable to update instance info: %v", err)
+	}
+
+	dbMan.setDB(db)
+	if isDataSnapshot {
+		dbMan.knownTables, err = dbMan.extractTables()
+		if err != nil {
+			return fmt.Errorf("unable to extract tables: %v", err)
+		}
+	}
+	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+
+	// Releases the DB, when the Connection reference count reaches 0.
+	if prevDb != "" {
+		dataService.ReleaseDB(prevDb)
+	}
+	return nil
+}
diff --git a/data_test.go b/data_test.go
index 691e887..abad465 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,32 +15,54 @@
 package apidApigeeSync
 
 import (
+	"database/sql"
 	"github.com/apid/apid-core"
-	"github.com/apid/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"io/ioutil"
 	"sort"
 	"strconv"
 )
 
 var _ = Describe("data access tests", func() {
-	testCount := 1
-
-	var _ = BeforeEach(func() {
+	testCount := 0
+	var testDbMan *dbManager
+	var dbVersion string
+	BeforeEach(func() {
+		var testDir string
+		testDir, err := ioutil.TempDir(tmpDir, "data_test")
+		config.Set(configLocalStoragePath, testDir)
+		Expect(err).NotTo(HaveOccurred())
+		testDbMan = creatDbManager()
 		testCount++
-		db, err := dataService.DBVersion("data_test_" + strconv.Itoa(testCount))
+		dbVersion = "data_test_" + strconv.Itoa(testCount)
+		db, err := dataService.DBVersion(dbVersion)
 		Expect(err).Should(Succeed())
-		initDB(db)
-		createBootstrapTables(db)
-		setDB(db)
+		testDbMan.setDB(db)
 	})
 
-	var _ = AfterEach(func() {
-		data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount)))
+	AfterEach(func() {
+		config.Set(configLocalStoragePath, tmpDir)
 	})
 
-	Context("Update processing", func() {
+	It("check scope changes", func() {
+		newScopes := []string{"foo"}
+		scopes := []string{"bar"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo", "bar"}
+		scopes = []string{"bar"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo"}
+		scopes = []string{"bar", "foo"}
+		Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
+		newScopes = []string{"foo", "bar"}
+		scopes = []string{"bar", "foo"}
+		Expect(scopeChanged(newScopes, scopes)).To(BeNil())
+
+	})
+
+	Context("build Sql", func() {
 		It("unit test buildUpdateSql with single primary key", func() {
 			testRow := common.Row{
 				"id": {
@@ -66,7 +88,7 @@
 			}
 			sort.Strings(orderedColumns)
 
-			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
+			result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id"})
 			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id=$4, tenant_id=$5" +
 				" WHERE id=$6").To(Equal(result))
 		})
@@ -99,403 +121,11 @@
 			}
 			sort.Strings(orderedColumns)
 
-			result := buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
+			result := testDbMan.buildUpdateSql("TEST_TABLE", orderedColumns, testRow, []string{"id1", "id2"})
 			Expect("UPDATE TEST_TABLE SET _change_selector=$1, api_resources=$2, environments=$3, id1=$4, id2=$5, tenant_id=$6" +
 				" WHERE id1=$7 AND id2=$8").To(Equal(result))
 		})
 
-		It("test update with composite primary key", func() {
-			event := &common.ChangeList{}
-
-			//this needs to match what is actually in the DB
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-		})
-
-		It("update should succeed if newrow modifies the primary key", func() {
-			event := &common.ChangeList{}
-
-			//this needs to match what is actually in the DB
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "new_id",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-
-		It("update should succeed if newrow contains fewer fields than oldrow", func() {
-			event := &common.ChangeList{}
-
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-
-		It("update should succeed if oldrow contains fewer fields than newrow", func() {
-			event := &common.ChangeList{}
-
-			oldRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "A product for testing Greg",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			newRow := common.Row{
-				"id": {
-					Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
-				},
-				"api_resources": {
-					Value: "{/**}",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "43aef41d",
-				},
-				"description": {
-					Value: "new description",
-				},
-				"created_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"updated_at": {
-					Value: "2017-03-01 22:50:41.75+00:00",
-				},
-				"_change_selector": {
-					Value: "43aef41d",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    oldRow,
-					Operation: 1,
-				},
-			}
-			//insert and assert success
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//create update event
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    oldRow,
-					NewRow:    newRow,
-					Operation: 2,
-				},
-			}
-
-			//do the update
-			Expect(true).To(Equal(processChangeList(event)))
-			err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-		})
-	})
-
-	Context("Insert processing", func() {
 		It("Properly constructs insert sql for one row", func() {
 			newRow := common.Row{
 				"id": {
@@ -531,7 +161,7 @@
 			sort.Strings(orderedColumns)
 
 			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8)"
-			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
+			Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow})))
 		})
 
 		It("Properly constructs insert sql for multiple rows", func() {
@@ -595,275 +225,11 @@
 			sort.Strings(orderedColumns)
 
 			expectedSql := "INSERT INTO api_product(_change_selector,api_resources,created_at,description,environments,id,tenant_id,updated_at) VALUES ($1,$2,$3,$4,$5,$6,$7,$8),($9,$10,$11,$12,$13,$14,$15,$16)"
-			Expect(expectedSql).To(Equal(buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
+			Expect(expectedSql).To(Equal(testDbMan.buildInsertSql("api_product", orderedColumns, []common.Row{newRow1, newRow2})))
 		})
 
-		It("Properly executes insert for a single rows", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-		})
-
-		It("Properly executed insert for multiple rows", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-			newRow2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow2,
-					Operation: 1,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(2))
-		})
-
-		It("Fails to execute if row does not match existing table schema", func() {
-			event := &common.ChangeList{}
-
-			newRow1 := common.Row{
-				"not_and_id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-			}
-
-			ok := processChangeList(event)
-			Expect(false).To(Equal(ok))
-
-			var nRows int
-			//assert that no extraneous rows were added
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(0))
-		})
-
-		It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
-			event := &common.ChangeList{}
-			newRow1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			newRow2 := common.Row{
-				"not_and_id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    newRow2,
-					Operation: 1,
-				},
-			}
-
-			ok := processChangeList(event)
-			Expect(false).To(Equal(ok))
-		})
-	})
-
-	Context("Delete processing", func() {
 		It("Properly constructs sql prepare for Delete", func() {
+			createBootstrapTables(testDbMan.getDB())
 			row := common.Row{
 				"id": {
 					Value: "new_id",
@@ -891,324 +257,1233 @@
 				},
 			}
 
-			pkeys, err := getPkeysForTable("kms_api_product")
+			pkeys, err := testDbMan.getPkeysForTable("kms_api_product")
 			Expect(err).Should(Succeed())
-			sql := buildDeleteSql("kms_api_product", row, pkeys)
+			sql := testDbMan.buildDeleteSql("kms_api_product", row, pkeys)
 			Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
 		})
+	})
 
-		It("Verify execute single insert & single delete works", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
-
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
-
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-			}
-
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
-
-			Expect(true).To(Equal(processChangeList(event2)))
-
-			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(0))
-
-			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
+	Context("Process Changelist", func() {
+		BeforeEach(func() {
+			createBootstrapTables(testDbMan.getDB())
 		})
 
-		It("verify multiple insert and single delete works", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
+		Context("Update processing", func() {
+			It("test update with composite primary key", func() {
+				event := &common.ChangeList{}
 
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//this needs to match what is actually in the DB
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			Row2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row2,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-			}
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			//verify first row
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
 
-			//verify second row
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(2))
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Expect(true).To(Equal(processChangeList(event2)))
+			})
 
-			//verify second row still exists
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+			It("update should succeed if newrow modifies the primary key", func() {
+				event := &common.ChangeList{}
 
-			// validate delete
-			err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				//this needs to match what is actually in the DB
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			// delete again should fail - coz entry will not exist
-			Expect(false).To(Equal(processChangeList(event2)))
-		}, 3)
+				newRow := common.Row{
+					"id": {
+						Value: "new_id",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-		It("verify single insert and multiple delete fails", func() {
-			event1 := &common.ChangeList{}
-			event2 := &common.ChangeList{}
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			Row1 := common.Row{
-				"id": {
-					Value: "a",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
 
-			Row2 := common.Row{
-				"id": {
-					Value: "b",
-				},
-				"api_resources": {
-					Value: "r",
-				},
-				"environments": {
-					Value: "{test}",
-				},
-				"tenant_id": {
-					Value: "t",
-				},
-				"description": {
-					Value: "d",
-				},
-				"created_at": {
-					Value: "c",
-				},
-				"updated_at": {
-					Value: "u",
-				},
-				"_change_selector": {
-					Value: "cs",
-				},
-			}
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
 
-			event1.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					NewRow:    Row1,
-					Operation: 1,
-				},
-			}
-			event2.Changes = []common.Change{
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row1,
-					Operation: 3,
-				},
-				{
-					Table:     "kms.api_product",
-					OldRow:    Row2,
-					Operation: 3,
-				},
-			}
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
 
-			Expect(true).To(Equal(processChangeList(event1)))
-			var nRows int
-			//verify insert
-			err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
-				"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
-				"and _change_selector='cs'").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+			It("update should succeed if newrow contains fewer fields than oldrow", func() {
+				event := &common.ChangeList{}
 
-			//assert that no extraneous rows were added
-			err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(nRows).To(Equal(1))
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-			Expect(false).To(Equal(processChangeList(event2)))
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
 
-		}, 3)
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
+
+			It("update should succeed if oldrow contains fewer fields than newrow", func() {
+				event := &common.ChangeList{}
+
+				oldRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "A product for testing Greg",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
+
+				newRow := common.Row{
+					"id": {
+						Value: "87a4bfaa-b3c4-47cd-b6c5-378cdb68610c",
+					},
+					"api_resources": {
+						Value: "{/**}",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "43aef41d",
+					},
+					"description": {
+						Value: "new description",
+					},
+					"created_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"updated_at": {
+						Value: "2017-03-01 22:50:41.75+00:00",
+					},
+					"_change_selector": {
+						Value: "43aef41d",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    oldRow,
+						Operation: 1,
+					},
+				}
+				//insert and assert success
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//create update event
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    oldRow,
+						NewRow:    newRow,
+						Operation: 2,
+					},
+				}
+
+				//do the update
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				err = testDbMan.getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+			})
+		})
+
+		Context("Insert processing", func() {
+			It("Properly executes insert for a single rows", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+			})
+
+			It("Properly executed insert for multiple rows", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+				newRow2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow2,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(2))
+			})
+
+			It("Fails to execute if row does not match existing table schema", func() {
+				event := &common.ChangeList{}
+
+				newRow1 := common.Row{
+					"not_and_id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+
+				var nRows int
+				//assert that no extraneous rows were added
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(0))
+			})
+
+			It("Fails to execute at least one row does not match the table schema, even if other rows are valid", func() {
+				event := &common.ChangeList{}
+				newRow1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				newRow2 := common.Row{
+					"not_and_id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    newRow2,
+						Operation: 1,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event)).ShouldNot(Succeed())
+			})
+		})
+
+		Context("Delete processing", func() {
+
+			It("Verify execute single insert & single delete works", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+				// validate delete
+				err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(0))
+
+				// delete again should fail - coz entry will not exist
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+			})
+
+			It("verify multiple insert and single delete works", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				Row2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row2,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				//verify first row
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//verify second row
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(2))
+
+				Expect(testDbMan.processChangeList(event2)).Should(Succeed())
+
+				//verify second row still exists
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				// validate delete
+				err = testDbMan.getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				// delete again should fail - coz entry will not exist
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+			}, 3)
+
+			It("verify single insert and multiple delete fails", func() {
+				event1 := &common.ChangeList{}
+				event2 := &common.ChangeList{}
+
+				Row1 := common.Row{
+					"id": {
+						Value: "a",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				Row2 := common.Row{
+					"id": {
+						Value: "b",
+					},
+					"api_resources": {
+						Value: "r",
+					},
+					"environments": {
+						Value: "{test}",
+					},
+					"tenant_id": {
+						Value: "t",
+					},
+					"description": {
+						Value: "d",
+					},
+					"created_at": {
+						Value: "c",
+					},
+					"updated_at": {
+						Value: "u",
+					},
+					"_change_selector": {
+						Value: "cs",
+					},
+				}
+
+				event1.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						NewRow:    Row1,
+						Operation: 1,
+					},
+				}
+				event2.Changes = []common.Change{
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row1,
+						Operation: 3,
+					},
+					{
+						Table:     "kms.api_product",
+						OldRow:    Row2,
+						Operation: 3,
+					},
+				}
+
+				Expect(testDbMan.processChangeList(event1)).Should(Succeed())
+				var nRows int
+				//verify insert
+				err := testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+					"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
+					"and _change_selector='cs'").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				//assert that no extraneous rows were added
+				err = testDbMan.getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+				Expect(err).NotTo(HaveOccurred())
+				Expect(nRows).To(Equal(1))
+
+				Expect(testDbMan.processChangeList(event2)).ShouldNot(Succeed())
+
+			}, 3)
+		})
+
+		Context("ApigeeSync change event", func() {
+
+			Context(LISTENER_TABLE_APID_CLUSTER, func() {
+
+				It("should not change LISTENER_TABLE_APID_CLUSTER", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_APID_CLUSTER,
+							},
+						},
+					}
+					Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+
+					event = common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Update,
+								Table:     LISTENER_TABLE_APID_CLUSTER,
+							},
+						},
+					}
+
+					Expect(testDbMan.processChangeList(&event)).NotTo(Succeed())
+				})
+
+			})
+
+			Context("data scopes", func() {
+
+				It("insert event should add", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "i"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s1"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "j"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s2"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					count := 0
+					id := sql.NullString{}
+					rows, err := testDbMan.getDB().Query(`
+				SELECT scope FROM EDGEX_DATA_SCOPE`)
+					Expect(err).NotTo(HaveOccurred())
+					defer rows.Close()
+					for rows.Next() {
+						count++
+						Expect(rows.Scan(&id)).Should(Succeed())
+						Expect(id.String).Should(Equal("s" + strconv.Itoa(count)))
+					}
+
+					Expect(count).To(Equal(2))
+				})
+
+				It("delete event should delete", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Insert,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								NewRow: common.Row{
+									"id":               &common.ColumnVal{Value: "i"},
+									"apid_cluster_id":  &common.ColumnVal{Value: "a"},
+									"scope":            &common.ColumnVal{Value: "s"},
+									"org":              &common.ColumnVal{Value: "o"},
+									"env":              &common.ColumnVal{Value: "e"},
+									"created":          &common.ColumnVal{Value: "c"},
+									"created_by":       &common.ColumnVal{Value: "c"},
+									"updated":          &common.ColumnVal{Value: "u"},
+									"updated_by":       &common.ColumnVal{Value: "u"},
+									"_change_selector": &common.ColumnVal{Value: "cs"},
+								},
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					event = common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Delete,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+								OldRow:    event.Changes[0].NewRow,
+							},
+						},
+					}
+
+					testDbMan.processChangeList(&event)
+
+					var nRows int
+					err := testDbMan.getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
+					Expect(err).NotTo(HaveOccurred())
+					Expect(nRows).To(BeZero())
+				})
+
+				It("update event should panic for data scopes table", func() {
+					event := common.ChangeList{
+						LastSequence: "test",
+						Changes: []common.Change{
+							{
+								Operation: common.Update,
+								Table:     LISTENER_TABLE_DATA_SCOPE,
+							},
+						},
+					}
+
+					Expect(testDbMan.processChangeList(&event)).ToNot(Succeed())
+				})
+
+			})
+		})
+
 	})
+
+	Context("Process Snapshot", func() {
+		initTestDb := func(sqlFile string, dbMan *dbManager) common.Snapshot {
+			stmts, err := ioutil.ReadFile(sqlFile)
+			Expect(err).Should(Succeed())
+			Expect(testDbMan.getDB().Exec(string(stmts))).ShouldNot(BeNil())
+			Expect(testDbMan.initDB()).Should(Succeed())
+			return common.Snapshot{
+				SnapshotInfo: dbVersion,
+			}
+		}
+
+		AfterEach(func() {
+
+		})
+
+		It("should fail if more than one apid_cluster rows", func() {
+			event := initTestDb("./sql/init_listener_test_duplicate_apids.sql", testDbMan)
+			Expect(testDbMan.processSnapshot(&event, true)).ToNot(Succeed())
+		})
+
+		It("should process a valid Snapshot", func() {
+			config.Set(configApidClusterId, "a")
+			apidInfo.ClusterID = "a"
+			event := initTestDb("./sql/init_listener_test_valid_snapshot.sql", testDbMan)
+			Expect(testDbMan.processSnapshot(&event, true)).Should(Succeed())
+
+			info, err := testDbMan.getApidInstanceInfo()
+			Expect(err).Should(Succeed())
+			Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
+			Expect(info.IsNewInstance).To(BeFalse())
+			Expect(dataService.DBVersion(event.SnapshotInfo)).Should(Equal(testDbMan.getDB()))
+
+			// apid Cluster
+			id := &sql.NullString{}
+			Expect(testDbMan.getDB().QueryRow(`SELECT id FROM EDGEX_APID_CLUSTER`).
+				Scan(id)).Should(Succeed())
+			Expect(id.Valid).Should(BeTrue())
+			Expect(id.String).To(Equal("i"))
+
+			// Data Scope
+			env := &sql.NullString{}
+			count := 0
+			rows, err := testDbMan.getDB().Query(`SELECT env FROM EDGEX_DATA_SCOPE`)
+			Expect(err).Should(Succeed())
+			defer rows.Close()
+			for rows.Next() {
+				count++
+				rows.Scan(&env)
+				Expect(env.Valid).Should(BeTrue())
+				Expect(env.String).To(Equal("e" + strconv.Itoa(count)))
+			}
+			Expect(count).To(Equal(3))
+
+			//find scopes for Id
+			scopes, err := testDbMan.findScopesForId("a")
+			Expect(err).Should(Succeed())
+			Expect(len(scopes)).To(Equal(6))
+			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
+			sort.Strings(scopes)
+			sort.Strings(expectedScopes)
+			Expect(scopes).Should(Equal(expectedScopes))
+		})
+
+		It("should detect clusterid change", func() {
+			Expect(testDbMan.initDB()).Should(Succeed())
+			testDbMan.updateApidInstanceInfo("a", "b", "c")
+			config.Set(configApidClusterId, "d")
+
+			info, err := testDbMan.getApidInstanceInfo()
+			Expect(err).Should(Succeed())
+			Expect(info.LastSnapshot).To(BeZero())
+			Expect(info.IsNewInstance).To(BeTrue())
+		})
+	})
+
 })
 
 func createBootstrapTables(db apid.DB) {
 	tx, err := db.Begin()
 	Expect(err).To(Succeed())
 	//all tests in this file operate on the api_product table.  Create the necessary tables for this here
-	tx.Exec("CREATE TABLE _transicator_tables " +
-		"(tableName varchar not null, columnName varchar not null, " +
-		"typid integer, primaryKey bool);")
-	tx.Exec("DELETE from _transicator_tables")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
-	tx.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
-	tx.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
-		"api_resources text,approval_type text,scopes text,proxies text, environments text," +
-		"created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
-		"primary key (id,tenant_id,created_at,updated_at));")
-	tx.Exec("DELETE from kms_api_product")
-	err = tx.Commit()
+	_, err = tx.Exec(`CREATE TABLE _transicator_tables
+	(tableName varchar not null, columnName varchar not null, typid integer, primaryKey bool);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','id',2950,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','tenant_id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','name',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','display_name',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','description',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','api_resources',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','approval_type',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','scopes',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','proxies',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','environments',1015,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_time_unit',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','quota_interval',23,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_at',1114,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','created_by',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_at',1114,1);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','updated_by',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('kms_api_product','_change_selector',1043,0);
+	CREATE TABLE "kms_api_product" (id text,tenant_id text,name text,display_name text,description text,api_resources text,approval_type text,scopes text,proxies text,environments text,quota text,quota_time_unit text,quota_interval integer,created_at blob,created_by text,updated_at blob,updated_by text,_change_selector text, primary key (id,tenant_id,created_at,updated_at));
+	`)
 	Expect(err).To(Succeed())
+	_, err = tx.Exec(`
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','apid_cluster_id',1043,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','scope',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org',25,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env',25,1);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','org_scope',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','env_scope',1043,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created',1114,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','created_by',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated',1114,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','updated_by',25,0);
+	INSERT INTO "_transicator_tables" VALUES('edgex_data_scope','_change_selector',25,1);
+	CREATE TABLE "edgex_data_scope" (id text,apid_cluster_id text,scope text,org text,env text,org_scope text,
+	env_scope text,created blob,created_by text,updated blob,updated_by text,_change_selector text,
+	primary key (id,apid_cluster_id,apid_cluster_id,org,env,_change_selector));
+	`)
+	Expect(err).To(Succeed())
+
+	Expect(tx.Commit()).To(Succeed())
 }
diff --git a/dockertests/apid_config.yaml b/dockertests/apid_config.yaml
new file mode 100644
index 0000000..e4cd751
--- /dev/null
+++ b/dockertests/apid_config.yaml
@@ -0,0 +1,9 @@
+apigeesync_instance_name: SQLLITAPID
+apigeesync_snapshot_server_base: http://localhost:9001/
+apigeesync_change_server_base: http://localhost:9000/
+apigeesync_snapshot_proto: sqlite
+log_level: Debug
+apigeesync_consumer_key: 33f39JNLosF1mDOXJoCfbauchVzPrGrl
+apigeesync_consumer_secret: LAolGShAx6H3vfNF
+apigeesync_cluster_id: 4c6bb536-0d64-43ca-abae-17c08f1a7e58
+local_storage_path: /Users/haoming/go/src/github.com/apid/apidApigeeSync/tmp/sqlite
diff --git a/dockertests/dockerSetup.sh b/dockertests/dockerSetup.sh
index 8df2b5d..6ded35d 100755
--- a/dockertests/dockerSetup.sh
+++ b/dockertests/dockerSetup.sh
@@ -25,7 +25,6 @@
 TEST_PG_BASE=postgres://postgres:changeme@$DOCKER_IP:5432
 TEST_PG_URL=postgres://postgres:changeme@$DOCKER_IP:5432/edgex
 echo ${TEST_PG_URL}
-
 export APIGEE_SYNC_DOCKER_PG_URL=${TEST_PG_URL}
 export APIGEE_SYNC_DOCKER_IP=${DOCKER_IP}
 
@@ -47,8 +46,13 @@
 ssname=apidSync_test_ss
 csname=apidSync_test_cs
 
+# setup docker network
+docker network rm apidApigeeSync-docker-test || true
+docker network create apidApigeeSync-docker-test
+DOCKER_PG_URL=postgres://postgres:changeme@$pgname:5432/edgex
+echo $DOCKER_PG_URL
 # run PG
-docker run --name ${pgname} -p 5432:5432 -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres
+docker run --name ${pgname} -p 5432:5432 --network=apidApigeeSync-docker-test -d -e POSTGRES_PASSWORD=changeme apigeelabs/transicator-postgres
 
 # Wait for PG to be up -- it takes a few seconds
 while `true`
@@ -67,8 +71,8 @@
 psql -f ${WORK_DIR}/dockertests/user-setup.sql ${TEST_PG_URL}
 
 # run SS and CS
-docker run --name ${ssname} -d -p 9001:9001 apigeelabs/transicator-snapshot -p 9001 -u ${TEST_PG_URL}
-docker run --name ${csname} -d -p 9000:9000 apigeelabs/transicator-changeserver -p 9000 -u ${TEST_PG_URL} -s testslot
+docker run --name ${ssname} -d -p 9001:9001 --network=apidApigeeSync-docker-test apigeelabs/transicator-snapshot -p 9001 -u ${DOCKER_PG_URL}
+docker run --name ${csname} -d -p 9000:9000 --network=apidApigeeSync-docker-test apigeelabs/transicator-changeserver -p 9000 -u ${DOCKER_PG_URL} -s testslot
 
 # Wait for SS to be up
 while `true`
diff --git a/dockertests/docker_test.go b/dockertests/docker_test.go
index e8fc25d..631e17a 100644
--- a/dockertests/docker_test.go
+++ b/dockertests/docker_test.go
@@ -18,6 +18,7 @@
 	"encoding/json"
 	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/factory"
+	"github.com/apid/apidApigeeSync"
 	_ "github.com/apid/apidApigeeSync"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
@@ -77,7 +78,7 @@
 	config.Set(configProxyServerBaseURI, testServer.URL)
 
 	// init plugin
-	apid.RegisterPlugin(initPlugin)
+	apid.RegisterPlugin(initPlugin, apidApigeeSync.PluginData)
 	apid.InitializePlugins("dockerTest")
 
 	<-initDone
@@ -263,6 +264,8 @@
 		updated:        t,
 		updatedBy:      testInitUser,
 		changeSelector: clusterId,
+		orgScope:       "org1",
+		envScope:       "env1",
 	}
 
 	bf := bundleConfigData{
@@ -335,7 +338,7 @@
 type newTableHandler struct {
 	targetTablename string
 	done            Done
-	verifyFunc      func (string, apid.DB)
+	verifyFunc      func(string, apid.DB)
 }
 
 func (n *newTableHandler) Handle(event apid.Event) {
diff --git a/dockertests/management_pg.go b/dockertests/management_pg.go
index bbe28e9..77bdf67 100644
--- a/dockertests/management_pg.go
+++ b/dockertests/management_pg.go
@@ -90,9 +90,11 @@
 			created_by,
 			updated,
 			updated_by,
-			_change_selector
+			_change_selector,
+			org_scope,
+			env_scope
 			)
-			VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)`)
+			VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)`)
 	if err != nil {
 		return err
 	}
@@ -108,6 +110,8 @@
 		ds.updated,
 		ds.updatedBy,
 		ds.changeSelector,
+		ds.orgScope,
+		ds.envScope,
 	)
 
 	return err
diff --git a/dockertests/master-schema.sql b/dockertests/master-schema.sql
index 756ff62..00f60d0 100644
--- a/dockertests/master-schema.sql
+++ b/dockertests/master-schema.sql
@@ -45,6 +45,8 @@
     updated timestamp without time zone,
     updated_by text,
     _change_selector text,
+    org_scope character varying(36) NOT NULL,
+    env_scope character varying(36) NOT NULL,
     CONSTRAINT data_scope_pkey PRIMARY KEY (id),
     CONSTRAINT data_scope_apid_cluster_id_fk FOREIGN KEY (apid_cluster_id)
           REFERENCES apid_cluster (id)
diff --git a/dockertests/pg_table_data.go b/dockertests/pg_table_data.go
index 0a1aecd..854e587 100644
--- a/dockertests/pg_table_data.go
+++ b/dockertests/pg_table_data.go
@@ -42,6 +42,8 @@
 	updated        time.Time
 	updatedBy      string
 	changeSelector string
+	orgScope       string
+	envScope       string
 }
 
 /* FOREIGN KEY (data_scope_id)
diff --git a/init.go b/init.go
index b7f470e..f381ca4 100644
--- a/init.go
+++ b/init.go
@@ -15,7 +15,6 @@
 package apidApigeeSync
 
 import (
-	"encoding/json"
 	"fmt"
 	"net/http"
 	"os"
@@ -39,7 +38,8 @@
 	// special value - set by ApigeeSync, not taken from configuration
 	configApidInstanceID = "apigeesync_apid_instance_id"
 	// This will not be needed once we have plugin handling tokens.
-	configBearerToken = "apigeesync_bearer_token"
+	configBearerToken      = "apigeesync_bearer_token"
+	configLocalStoragePath = "local_storage_path"
 )
 
 const (
@@ -48,17 +48,12 @@
 
 var (
 	/* All set during plugin initialization */
-	log                 apid.LogService
-	config              apid.ConfigService
-	dataService         apid.DataService
-	events              apid.EventsService
-	apidInfo            apidInstanceInfo
-	newInstanceID       bool
-	apidTokenManager    tokenManager
-	apidChangeManager   changeManager
-	apidSnapshotManager snapShotManager
-	httpclient          *http.Client
-	isOfflineMode       bool
+	log          apid.LogService
+	config       apid.ConfigService
+	dataService  apid.DataService
+	eventService apid.EventsService
+	apiService   apid.APIService
+	apidInfo     apidInstanceInfo
 
 	/* Set during post plugin initialization
 	 * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -68,6 +63,7 @@
 
 type apidInstanceInfo struct {
 	InstanceID, InstanceName, ClusterID, LastSnapshot string
+	IsNewInstance                                     bool
 }
 
 type pluginDetail struct {
@@ -76,7 +72,7 @@
 }
 
 func init() {
-	apid.RegisterPlugin(initPlugin, pluginData)
+	apid.RegisterPlugin(initPlugin, PluginData)
 }
 
 func initConfigDefaults() {
@@ -93,59 +89,7 @@
 	log.Debugf("Using %s as display name", config.GetString(configName))
 }
 
-func initVariables() error {
-
-	var tr *http.Transport
-
-	tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
-	tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
-
-	httpclient = &http.Client{
-		Transport: tr,
-		Timeout:   httpTimeout,
-		CheckRedirect: func(req *http.Request, _ []*http.Request) error {
-			req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
-			return nil
-		},
-	}
-
-	// set up default database
-	db, err := dataService.DB()
-	if err != nil {
-		return fmt.Errorf("Unable to access DB: %v", err)
-	}
-	err = initDB(db)
-	if err != nil {
-		return fmt.Errorf("Unable to access DB: %v", err)
-	}
-	setDB(db)
-
-	apidInfo, err = getApidInstanceInfo()
-	if err != nil {
-		return fmt.Errorf("Unable to get apid instance info: %v", err)
-	}
-
-	if config.IsSet(configApidInstanceID) {
-		log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
-	}
-	config.Set(configApidInstanceID, apidInfo.InstanceID)
-
-	return nil
-}
-
-func createManagers() {
-	if isOfflineMode {
-		apidSnapshotManager = &offlineSnapshotManager{}
-		apidChangeManager = &offlineChangeManager{}
-	} else {
-		apidSnapshotManager = createSnapShotManager()
-		apidChangeManager = createChangeManager()
-	}
-
-	apidTokenManager = createSimpleTokenManager()
-}
-
-func checkForRequiredValues() error {
+func checkForRequiredValues(isOfflineMode bool) error {
 	required := []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret}
 	if !isOfflineMode {
 		required = append(required, configSnapServerBaseURI, configChangeServerBaseURI)
@@ -153,12 +97,12 @@
 	// check for required values
 	for _, key := range required {
 		if !config.IsSet(key) {
-			return fmt.Errorf("Missing required config value: %s", key)
+			return fmt.Errorf("missing required config value: %s", key)
 		}
 	}
 	proto := config.GetString(configSnapshotProtocol)
 	if proto != "sqlite" {
-		return fmt.Errorf("Illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
+		return fmt.Errorf("illegal value for %s. Only currently supported snashot protocol is sqlite", configSnapshotProtocol)
 	}
 
 	return nil
@@ -168,90 +112,98 @@
 	log = logger
 }
 
-/* initialization */
-func _initPlugin(services apid.Services) error {
-	log.Debug("start init")
+func initManagers(isOfflineMode bool) (*listenerManager, *ApiManager, error) {
+	// check for forward proxy
+	var tr *http.Transport
+	tr = util.Transport(config.GetString(util.ConfigfwdProxyPortURL))
+	tr.MaxIdleConnsPerHost = maxIdleConnsPerHost
 
-	config = services.Config()
-	initConfigDefaults()
-
-	if config.GetBool(configDiagnosticMode) {
-		log.Warn("Diagnostic mode: will not download changelist and snapshots!")
-		isOfflineMode = true
-	}
-
-	err := checkForRequiredValues()
+	apidDbManager := creatDbManager()
+	db, err := dataService.DB()
 	if err != nil {
-		return err
+		return nil, nil, fmt.Errorf("unable to access DB: %v", err)
 	}
-
-	err = initVariables()
+	apidDbManager.setDB(db)
+	err = apidDbManager.initDB()
 	if err != nil {
-		return err
+		return nil, nil, fmt.Errorf("unable to access DB: %v", err)
 	}
 
-	return nil
+	apidInfo, err = apidDbManager.getApidInstanceInfo()
+	if err != nil {
+		return nil, nil, fmt.Errorf("unable to get apid instance info: %v", err)
+	}
+
+	if config.IsSet(configApidInstanceID) {
+		log.Warnf("ApigeeSync plugin overriding %s.", configApidInstanceID)
+	}
+	config.Set(configApidInstanceID, apidInfo.InstanceID)
+
+	apidTokenManager := createApidTokenManager(apidInfo.IsNewInstance)
+	var snapMan snapshotManager
+	var apidChangeManager changeManager
+
+	if isOfflineMode {
+		snapMan = &offlineSnapshotManager{
+			dbMan: apidDbManager,
+		}
+		apidChangeManager = &offlineChangeManager{}
+	} else {
+		httpClient := &http.Client{
+			Transport: tr,
+			Timeout:   httpTimeout,
+			CheckRedirect: func(req *http.Request, _ []*http.Request) error {
+				req.Header.Set("Authorization", "Bearer "+apidTokenManager.getBearerToken())
+				return nil
+			},
+		}
+		snapMan = createSnapShotManager(apidDbManager, apidTokenManager, httpClient)
+		apidChangeManager = createChangeManager(apidDbManager, snapMan, apidTokenManager, httpClient)
+	}
+
+	listenerMan := &listenerManager{
+		changeMan:     apidChangeManager,
+		snapMan:       snapMan,
+		tokenMan:      apidTokenManager,
+		isOfflineMode: isOfflineMode,
+	}
+
+	apiMan := &ApiManager{
+		endpoint: tokenEndpoint,
+		tokenMan: apidTokenManager,
+	}
+	return listenerMan, apiMan, nil
 }
 
 func initPlugin(services apid.Services) (apid.PluginData, error) {
 	SetLogger(services.Log().ForModule("apigeeSync"))
 	dataService = services.Data()
-	events = services.Events()
+	eventService = services.Events()
+	apiService = services.API()
+	log.Debug("start init")
+	config = services.Config()
+	initConfigDefaults()
 
-	err := _initPlugin(services)
+	isOfflineMode := false
+	if config.GetBool(configDiagnosticMode) {
+		log.Warn("Diagnostic mode: will not download changelist and snapshots!")
+		isOfflineMode = true
+	}
+
+	err := checkForRequiredValues(isOfflineMode)
 	if err != nil {
-		return pluginData, err
+		return PluginData, err
 	}
+	if err != nil {
+		return PluginData, err
+	}
+	listenerMan, apiMan, err := initManagers(isOfflineMode)
+	if err != nil {
+		return PluginData, err
+	}
+	listenerMan.init()
+	apiMan.InitAPI(apiService)
 
-	createManagers()
-
-	/* This callback function will get called once all the plugins are
-	 * initialized (not just this plugin). This is needed because,
-	 * downloadSnapshots/changes etc have to begin to be processed only
-	 * after all the plugins are initialized
-	 */
-	events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
-
-	InitAPI(services)
 	log.Debug("end init")
-
-	return pluginData, nil
-}
-
-// Plugins have all initialized, gather their info and start the ApigeeSync downloads
-func postInitPlugins(event apid.Event) {
-	var plinfoDetails []pluginDetail
-	if pie, ok := event.(apid.PluginsInitializedEvent); ok {
-		/*
-		 * Store the plugin details in the heap. Needed during
-		 * Bearer token generation request.
-		 */
-		for _, plugin := range pie.Plugins {
-			name := plugin.Name
-			version := plugin.Version
-			if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
-				inf := pluginDetail{
-					Name:          name,
-					SchemaVersion: schemaVersion}
-				plinfoDetails = append(plinfoDetails, inf)
-				log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
-			}
-		}
-		if plinfoDetails == nil {
-			log.Panicf("No Plugins registered!")
-		}
-
-		pgInfo, err := json.Marshal(plinfoDetails)
-		if err != nil {
-			log.Panicf("Unable to marshal plugin data: %v", err)
-		}
-		apidPluginDetails = string(pgInfo[:])
-
-		log.Debug("start post plugin init")
-
-		apidTokenManager.start()
-		go bootstrap()
-
-		log.Debug("Done post plugin init")
-	}
+	return PluginData, nil
 }
diff --git a/init_test.go b/init_test.go
index 03e5450..bdd9fc7 100644
--- a/init_test.go
+++ b/init_test.go
@@ -18,36 +18,78 @@
 	"github.com/apid/apid-core"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"net/http"
+	"strconv"
 )
 
 var _ = Describe("init", func() {
-	var _ = BeforeEach(func() {
-		_initPlugin(apid.AllServices())
+	testCount := 0
+	BeforeEach(func() {
+		testCount++
 	})
 
 	Context("Apid Instance display name", func() {
+		AfterEach(func() {
+			apiService = apid.API()
+		})
 
-		It("should be hostname by default", func() {
-			log.Info("Starting init tests...")
+		It("init should register listener", func() {
+			me := &mockEvent{
+				listenerMap: make(map[apid.EventSelector]apid.EventHandlerFunc),
+			}
+			ma := &mockApi{
+				handleMap: make(map[string]http.HandlerFunc),
+			}
+			ms := &mockService{
+				config: apid.Config(),
+				log:    apid.Log(),
+				api:    ma,
+				data:   apid.Data(),
+				events: me,
+			}
+			testname := "test_" + strconv.Itoa(testCount)
+			ms.config.Set(configName, testname)
+			pd, err := initPlugin(ms)
+			Expect(err).Should(Succeed())
+			Expect(apidInfo.InstanceName).To(Equal(testname))
+			Expect(me.listenerMap[apid.SystemEventsSelector]).ToNot(BeNil())
+			Expect(ma.handleMap[tokenEndpoint]).ToNot(BeNil())
+			Expect(pd).Should(Equal(PluginData))
+			Expect(apidInfo.IsNewInstance).Should(BeTrue())
+		})
 
-			initConfigDefaults()
-			Expect(apidInfo.InstanceName).To(Equal("testhost"))
-		}, 3)
+		It("create managers for normal mode", func() {
+			listenerMan, apiMan, err := initManagers(false)
+			Expect(err).Should(Succeed())
+			Expect(listenerMan).ToNot(BeNil())
+			Expect(listenerMan.tokenMan).ToNot(BeNil())
+			snapMan, ok := listenerMan.snapMan.(*apidSnapshotManager)
+			Expect(ok).Should(BeTrue())
+			Expect(snapMan.tokenMan).ToNot(BeNil())
+			Expect(snapMan.dbMan).ToNot(BeNil())
+			changeMan, ok := listenerMan.changeMan.(*pollChangeManager)
+			Expect(ok).Should(BeTrue())
+			Expect(changeMan.tokenMan).ToNot(BeNil())
+			Expect(changeMan.dbMan).ToNot(BeNil())
+			Expect(changeMan.snapMan).ToNot(BeNil())
+			Expect(apiMan).ToNot(BeNil())
+			Expect(apiMan.tokenMan).ToNot(BeNil())
+		})
 
-		It("accept display name from config", func() {
-			config.Set(configName, "aa01")
-			initConfigDefaults()
-			var apidInfoLatest apidInstanceInfo
-			apidInfoLatest, _ = getApidInstanceInfo()
-			Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
-			Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
-		}, 3)
+		It("create managers for diagnostic mode", func() {
+			config.Set(configDiagnosticMode, true)
+			listenerMan, apiMan, err := initManagers(true)
+			Expect(err).Should(Succeed())
+			Expect(listenerMan).ToNot(BeNil())
+			Expect(listenerMan.tokenMan).ToNot(BeNil())
+			snapMan, ok := listenerMan.snapMan.(*offlineSnapshotManager)
+			Expect(ok).Should(BeTrue())
+			Expect(snapMan.dbMan).ToNot(BeNil())
+			_, ok = listenerMan.changeMan.(*offlineChangeManager)
+			Expect(ok).Should(BeTrue())
+			Expect(apiMan).ToNot(BeNil())
+			Expect(apiMan.tokenMan).ToNot(BeNil())
+		})
 
 	})
-
-	It("should put apigeesync_apid_instance_id value in config", func() {
-		instanceID := config.GetString(configApidInstanceID)
-		Expect(instanceID).NotTo(BeEmpty())
-		Expect(instanceID).To(Equal(apidInfo.InstanceID))
-	})
 })
diff --git a/listener.go b/listener.go
index 94befd6..7f46b34 100644
--- a/listener.go
+++ b/listener.go
@@ -15,9 +15,8 @@
 package apidApigeeSync
 
 import (
-	"errors"
+	"encoding/json"
 	"github.com/apid/apid-core"
-	"github.com/apigee-labs/transicator/common"
 )
 
 const (
@@ -25,107 +24,86 @@
 	LISTENER_TABLE_DATA_SCOPE   = "edgex.data_scope"
 )
 
-func processSnapshot(snapshot *common.Snapshot) {
+type listenerManager struct {
+	changeMan     changeManager
+	snapMan       snapshotManager
+	tokenMan      tokenManager
+	isOfflineMode bool
+}
 
-	var prevDb string
-	if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
-		log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
-			apidInfo.LastSnapshot, snapshot.SnapshotInfo)
-		prevDb = apidInfo.LastSnapshot
-	} else {
-		log.Debugf("Process snapshot for version {%s}",
-			snapshot.SnapshotInfo)
-	}
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Unable to access database: %v", err)
-	}
+func (l *listenerManager) init() {
+	/* This callback function will get called once all the plugins are
+	 * initialized (not just this plugin). This is needed because,
+	 * downloadSnapshots/changes etc have to begin to be processed only
+	 * after all the plugins are initialized
+	 */
+	eventService.ListenOnceFunc(apid.SystemEventsSelector, l.postInitPlugins)
+}
 
-	processSqliteSnapshot(db)
+// Plugins have all initialized, gather their info and start the ApigeeSync downloads
+func (l *listenerManager) postInitPlugins(event apid.Event) {
+	var plinfoDetails []pluginDetail
+	if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+		/*
+		 * Store the plugin details in the heap. Needed during
+		 * Bearer token generation request.
+		 */
+		for _, plugin := range pie.Plugins {
+			name := plugin.Name
+			version := plugin.Version
+			if schemaVersion, ok := plugin.ExtraData["schemaVersion"].(string); ok {
+				inf := pluginDetail{
+					Name:          name,
+					SchemaVersion: schemaVersion}
+				plinfoDetails = append(plinfoDetails, inf)
+				log.Debugf("plugin %s is version %s, schemaVersion: %s", name, version, schemaVersion)
+			}
+		}
+		if plinfoDetails == nil {
+			log.Panic("No Plugins registered!")
+		}
 
-	//update apid instance info
-	apidInfo.LastSnapshot = snapshot.SnapshotInfo
-	err = updateApidInstanceInfo()
-	if err != nil {
-		log.Panicf("Unable to update instance info: %v", err)
-	}
+		pgInfo, err := json.Marshal(plinfoDetails)
+		if err != nil {
+			log.Panicf("Unable to marshal plugin data: %v", err)
+		}
+		apidPluginDetails = string(pgInfo[:])
 
-	setDB(db)
-	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+		log.Debug("start post plugin init")
 
-	// Releases the DB, when the Connection reference count reaches 0.
-	if prevDb != "" {
-		dataService.ReleaseDB(prevDb)
+		l.tokenMan.start()
+		go l.bootstrap(apidInfo.LastSnapshot)
+
+		log.Debug("Done post plugin init")
 	}
 }
 
-func processSqliteSnapshot(db apid.DB) {
-
-	var numApidClusters int
-	tx, err := db.Begin()
-	if err != nil {
-		log.Panicf("Unable to open DB txn: {%v}", err.Error())
-	}
-	defer tx.Rollback()
-	err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
-	if err != nil {
-		log.Panicf("Unable to read database: {%s}", err.Error())
+/*
+ *  Start from existing snapshot if possible
+ *  If an existing snapshot does not exist, use the apid scope to fetch
+ *  all data scopes, then get a snapshot for those data scopes
+ *
+ *  Then, poll for changes
+ */
+func (l *listenerManager) bootstrap(lastSnap string) {
+	if l.isOfflineMode && lastSnap == "" {
+		log.Panic("Diagnostic mode requires existent snapshot info in default DB.")
 	}
 
-	if numApidClusters != 1 {
-		log.Panic("Illegal state for apid_cluster. Must be a single row.")
-	}
-
-	_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
-	if err != nil {
-		if err.Error() == "duplicate column name: last_sequence" {
+	if lastSnap != "" {
+		if err := l.snapMan.startOnDataSnapshot(lastSnap); err == nil {
+			log.Infof("Started on local snapshot: %s", lastSnap)
+			l.changeMan.pollChangeWithBackoff()
 			return
 		} else {
-			log.Panicf("Unable to create last_sequence column on DB.  Error {%v}", err.Error())
-		}
-	}
-	if err = tx.Commit(); err != nil {
-		log.Errorf("Error when commit in processSqliteSnapshot: %v", err)
-	}
-}
-
-func processChangeList(changes *common.ChangeList) bool {
-
-	ok := false
-
-	tx, err := getDB().Begin()
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-	}
-	defer tx.Rollback()
-
-	log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
-
-	for _, change := range changes.Changes {
-		if change.Table == LISTENER_TABLE_APID_CLUSTER {
-			log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-		}
-		switch change.Operation {
-		case common.Insert:
-			ok = insert(change.Table, []common.Row{change.NewRow}, tx)
-		case common.Update:
-			if change.Table == LISTENER_TABLE_DATA_SCOPE {
-				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
-			}
-			ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
-		case common.Delete:
-			ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
-		}
-		if !ok {
-			err = errors.New("Sql Operation error. Operation rollbacked")
-			log.Error("Sql Operation error. Operation rollbacked")
-			return false
+			log.Errorf("Failed to bootstrap on local snapshot: %v", err)
+			log.Warn("Will get new snapshots.")
 		}
 	}
 
-	if err = tx.Commit(); err != nil {
-		log.Errorf("Commit error in processChangeList: %v", err)
-		return false
+	l.snapMan.downloadBootSnapshot()
+	if err := l.snapMan.downloadDataSnapshot(); err != nil {
+		log.Panicf("Error downloading data snapshot: %v", err)
 	}
-	return true
+	l.changeMan.pollChangeWithBackoff()
 }
diff --git a/listener_test.go b/listener_test.go
index 54974eb..5e8f072 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,360 +15,79 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
-
-	"github.com/apid/apid-core"
-	"github.com/apigee-labs/transicator/common"
-	"os"
-	"reflect"
-	"sort"
+	"strconv"
 )
 
 var _ = Describe("listener", func() {
+	testCount := 0
+	var testListenerMan *listenerManager
+	var dummyChangeMan *dummyChangeManager
+	var dummySnapMan *dummySnapshotManager
+	var dummyTokenMan *dummyTokenManager
 
-	var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
-		initDb(sqlfile, "./mockdb.sqlite3")
-		file, err := os.Open("./mockdb.sqlite3")
-		Expect(err).ShouldNot(HaveOccurred())
-
-		s := common.Snapshot{}
-		err = processSnapshotServerFileResponse(dbId, file, &s)
-		Expect(err).ShouldNot(HaveOccurred())
-		return s
-	}
-
-	var _ = BeforeEach(func() {
-		_initPlugin(apid.AllServices())
-	})
-
-	var _ = AfterEach(func() {
-		if wipeDBAferTest {
-			db, err := dataService.DB()
-			Expect(err).Should(Succeed())
-			tx, err := db.Begin()
-			Expect(err).Should(Succeed())
-			_, err = tx.Exec("DELETE FROM APID")
-			Expect(err).Should(Succeed())
-			err = tx.Commit()
-			Expect(err).Should(Succeed())
+	BeforeEach(func() {
+		testCount++
+		dummySnapMan = &dummySnapshotManager{
+			downloadCalledChan: make(chan bool, 1),
+			startCalledChan:    make(chan bool, 1),
 		}
-		wipeDBAferTest = true
+		dummyTokenMan = &dummyTokenManager{
+			invalidateChan: make(chan bool, 1),
+		}
+		dummyChangeMan = &dummyChangeManager{
+			pollChangeWithBackoffChan: make(chan bool, 1),
+		}
+		testListenerMan = &listenerManager{
+			changeMan: dummyChangeMan,
+			snapMan:   dummySnapMan,
+			tokenMan:  dummyTokenMan,
+		}
 	})
 
-	Context("ApigeeSync snapshot event", func() {
+	AfterEach(func() {
 
-		It("should fail if more than one apid_cluster rows", func() {
-			event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
-			Expect(func() { processSnapshot(&event) }).To(Panic())
-		}, 3)
-
-		It("should fail if more than one apid_cluster rows", func() {
-			newScopes := []string{"foo"}
-			scopes := []string{"bar"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo", "bar"}
-			scopes = []string{"bar"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo"}
-			scopes = []string{"bar", "foo"}
-			Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
-			newScopes = []string{"foo", "bar"}
-			scopes = []string{"bar", "foo"}
-			Expect(scopeChanged(newScopes, scopes)).To(BeNil())
-
-		}, 3)
-
-		It("should process a valid Snapshot", func() {
-
-			event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
-			processSnapshot(&event)
-
-			info, err := getApidInstanceInfo()
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
-
-			db := getDB()
-
-			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
-			Expect(err).NotTo(HaveOccurred())
-
-			Expect(db == expectedDB).Should(BeTrue())
-
-			// apid Cluster
-			var dcs []dataApidCluster
-
-			rows, err := db.Query(`
-			SELECT id, name, description, umbrella_org_app_name,
-				created, created_by, updated, updated_by
-			FROM EDGEX_APID_CLUSTER`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			c := dataApidCluster{}
-			for rows.Next() {
-				rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
-					&c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
-				dcs = append(dcs, c)
-			}
-
-			Expect(len(dcs)).To(Equal(1))
-			dc := dcs[0]
-
-			Expect(dc.ID).To(Equal("i"))
-			Expect(dc.Name).To(Equal("n"))
-			Expect(dc.Description).To(Equal("d"))
-			Expect(dc.OrgAppName).To(Equal("o"))
-			Expect(dc.Created).To(Equal("c"))
-			Expect(dc.CreatedBy).To(Equal("c"))
-			Expect(dc.Updated).To(Equal("u"))
-			Expect(dc.UpdatedBy).To(Equal("u"))
-
-			// Data Scope
-			var dds []dataDataScope
-
-			rows, err = db.Query(`
-			SELECT id, apid_cluster_id, scope, org,
-				env, created, created_by, updated,
-				updated_by
-			FROM EDGEX_DATA_SCOPE`)
-			Expect(err).NotTo(HaveOccurred())
-			defer rows.Close()
-
-			d := dataDataScope{}
-			for rows.Next() {
-				rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-					&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-					&d.UpdatedBy)
-				dds = append(dds, d)
-			}
-
-			Expect(len(dds)).To(Equal(3))
-			ds := dds[0]
-
-			Expect(ds.ID).To(Equal("i"))
-			Expect(ds.Org).To(Equal("o"))
-			Expect(ds.Env).To(Equal("e1"))
-			Expect(ds.Scope).To(Equal("s1"))
-			Expect(ds.Created).To(Equal("c"))
-			Expect(ds.CreatedBy).To(Equal("c"))
-			Expect(ds.Updated).To(Equal("u"))
-			Expect(ds.UpdatedBy).To(Equal("u"))
-
-			ds = dds[1]
-			Expect(ds.Env).To(Equal("e2"))
-			Expect(ds.Scope).To(Equal("s1"))
-			ds = dds[2]
-			Expect(ds.Env).To(Equal("e3"))
-			Expect(ds.Scope).To(Equal("s2"))
-
-			scopes := findScopesForId("a")
-			Expect(len(scopes)).To(Equal(6))
-			expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
-			sort.Strings(scopes)
-			sort.Strings(expectedScopes)
-			Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
-		}, 3)
 	})
 
-	Context("ApigeeSync change event", func() {
-
-		Context(LISTENER_TABLE_APID_CLUSTER, func() {
-
-			It("insert event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
-				processSnapshot(&ssEvent)
-
-				//save the last snapshot, so we can restore it at the end of this context
-
-				csEvent := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_APID_CLUSTER,
-						},
+	It("postInitPlugins, start cleanly", func() {
+		testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+		eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+		eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+			Description: "test",
+			Plugins: []apid.PluginData{
+				{
+					Name:    "name",
+					Version: "0.0.1",
+					ExtraData: map[string]interface{}{
+						"schemaVersion": "0.0.1",
 					},
-				}
-
-				Expect(func() { processChangeList(&csEvent) }).To(Panic())
-			}, 3)
-
-			It("update event should panic", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Update,
-							Table:     LISTENER_TABLE_APID_CLUSTER,
-						},
-					},
-				}
-
-				Expect(func() { processChangeList(&event) }).To(Panic())
-				//restore the last snapshot
-			}, 3)
-
+				},
+			},
 		})
-
-		Context(LISTENER_TABLE_DATA_SCOPE, func() {
-
-			It("insert event should add", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "i"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s1"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "j"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s2"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-					},
-				}
-
-				processChangeList(&event)
-
-				var dds []dataDataScope
-
-				rows, err := getDB().Query(`
-				SELECT id, apid_cluster_id, scope, org,
-					env, created, created_by, updated,
-					updated_by
-				FROM EDGEX_DATA_SCOPE`)
-				Expect(err).NotTo(HaveOccurred())
-				defer rows.Close()
-
-				d := dataDataScope{}
-				for rows.Next() {
-					rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
-						&d.Env, &d.Created, &d.CreatedBy, &d.Updated,
-						&d.UpdatedBy)
-					dds = append(dds, d)
-				}
-
-				//three already existing
-				Expect(len(dds)).To(Equal(2))
-				ds := dds[0]
-
-				Expect(ds.ID).To(Equal("i"))
-				Expect(ds.Org).To(Equal("o"))
-				Expect(ds.Env).To(Equal("e"))
-				Expect(ds.Scope).To(Equal("s1"))
-				Expect(ds.Created).To(Equal("c"))
-				Expect(ds.CreatedBy).To(Equal("c"))
-				Expect(ds.Updated).To(Equal("u"))
-				Expect(ds.UpdatedBy).To(Equal("u"))
-
-				ds = dds[1]
-				Expect(ds.Scope).To(Equal("s2"))
-
-				scopes := findScopesForId("a")
-				Expect(len(scopes)).To(Equal(2))
-				Expect(scopes[0]).To(Equal("s1"))
-				Expect(scopes[1]).To(Equal("s2"))
-
-			}, 3)
-
-			It("delete event should delete", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
-				processSnapshot(&ssEvent)
-				insert := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Insert,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							NewRow: common.Row{
-								"id":               &common.ColumnVal{Value: "i"},
-								"apid_cluster_id":  &common.ColumnVal{Value: "a"},
-								"scope":            &common.ColumnVal{Value: "s"},
-								"org":              &common.ColumnVal{Value: "o"},
-								"env":              &common.ColumnVal{Value: "e"},
-								"created":          &common.ColumnVal{Value: "c"},
-								"created_by":       &common.ColumnVal{Value: "c"},
-								"updated":          &common.ColumnVal{Value: "u"},
-								"updated_by":       &common.ColumnVal{Value: "u"},
-								"_change_selector": &common.ColumnVal{Value: "cs"},
-							},
-						},
-					},
-				}
-
-				processChangeList(&insert)
-
-				delete := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Delete,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-							OldRow:    insert.Changes[0].NewRow,
-						},
-					},
-				}
-
-				processChangeList(&delete)
-
-				var nRows int
-				err := getDB().QueryRow("SELECT count(id) FROM EDGEX_DATA_SCOPE").Scan(&nRows)
-				Expect(err).NotTo(HaveOccurred())
-
-				Expect(0).To(Equal(nRows))
-			}, 3)
-
-			It("update event should panic for data scopes table", func() {
-				ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_update_panic")
-				processSnapshot(&ssEvent)
-
-				event := common.ChangeList{
-					LastSequence: "test",
-					Changes: []common.Change{
-						{
-							Operation: common.Update,
-							Table:     LISTENER_TABLE_DATA_SCOPE,
-						},
-					},
-				}
-
-				Expect(func() { processChangeList(&event) }).To(Panic())
-				//restore the last snapshot
-			}, 3)
-
-			//TODO add tests for update/insert/delete cluster
-		})
+		Expect(<-dummySnapMan.downloadCalledChan).Should(BeFalse())
+		Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
 	})
+
+	It("postInitPlugins, start from local db", func() {
+		apidInfo.LastSnapshot = "test_snapshot"
+		testEvent := apid.EventSelector("test event" + strconv.Itoa(testCount))
+		eventService.ListenOnceFunc(testEvent, testListenerMan.postInitPlugins)
+		eventService.Emit(testEvent, apid.PluginsInitializedEvent{
+			Description: "test",
+			Plugins: []apid.PluginData{
+				{
+					Name:    "name",
+					Version: "0.0.1",
+					ExtraData: map[string]interface{}{
+						"schemaVersion": "0.0.1",
+					},
+				},
+			},
+		})
+		Expect(<-dummySnapMan.startCalledChan).Should(BeTrue())
+		Expect(<-dummyChangeMan.pollChangeWithBackoffChan).Should(BeTrue())
+	})
+
 })
diff --git a/managerInterfaces.go b/managerInterfaces.go
index a93e950..978aede 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -15,31 +15,38 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	"github.com/apigee-labs/transicator/common"
-	"net/url"
 )
 
 type tokenManager interface {
 	getBearerToken() string
-	invalidateToken() error
-	getToken() *OauthToken
+	invalidateToken()
 	close()
-	getRetrieveNewTokenClosure(*url.URL) func(chan bool) error
 	start()
 	getTokenReadyChannel() <-chan bool
 }
 
-type snapShotManager interface {
+type snapshotManager interface {
 	close() <-chan bool
 	downloadBootSnapshot()
-	storeBootSnapshot(snapshot *common.Snapshot)
-	downloadDataSnapshot()
-	storeDataSnapshot(snapshot *common.Snapshot)
-	downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error
-	startOnLocalSnapshot(snapshot string) *common.Snapshot
+	downloadDataSnapshot() error
+	startOnDataSnapshot(snapshot string) error
 }
 
 type changeManager interface {
 	close() <-chan bool
 	pollChangeWithBackoff()
 }
+
+type DbManager interface {
+	initDB() error
+	setDB(db apid.DB)
+	getLastSequence() (lastSequence string)
+	findScopesForId(configId string) (scopes []string, err error)
+	updateLastSequence(lastSequence string) error
+	getApidInstanceInfo() (info apidInstanceInfo, err error)
+	processChangeList(changes *common.ChangeList) error
+	processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error
+	getKnowTables() map[string]bool
+}
diff --git a/mock_server.go b/mock_server_test.go
similarity index 94%
rename from mock_server.go
rename to mock_server_test.go
index f39adb7..e61b83e 100644
--- a/mock_server.go
+++ b/mock_server_test.go
@@ -90,14 +90,14 @@
 	changeChannel   chan []byte
 	sequenceID      *int64
 	maxDevID        *int64
-	deployIDMutex   sync.RWMutex
+	deployIDMutex   *sync.RWMutex
 	minDeploymentID *int64
 	maxDeploymentID *int64
 	newSnap         *int32
 	authFail        *int32
 }
 
-func (m *MockServer) forceAuthFail() {
+func (m *MockServer) forceAuthFailOnce() {
 	atomic.StoreInt32(m.authFail, 1)
 }
 
@@ -118,11 +118,13 @@
 }
 
 func (m *MockServer) lastSequenceID() string {
-	return strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+	num := strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
+	return num + "." + num + "." + num
 }
 
 func (m *MockServer) nextSequenceID() string {
-	return strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+	num := strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
+	return num + "." + num + "." + num
 }
 
 func (m *MockServer) nextDeveloperID() string {
@@ -180,7 +182,7 @@
 	m.newSnap = new(int32)
 	m.authFail = new(int32)
 	*m.authFail = 0
-
+	m.deployIDMutex = &sync.RWMutex{}
 	initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
 	initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
 
@@ -266,8 +268,11 @@
 	scopes := q["scope"]
 
 	Expect(scopes).To(ContainElement(m.params.ClusterID))
-
-	w.Header().Set("Transicator-Snapshot-TXID", util.GenerateUUID())
+	if m.params.Scope != "" {
+		Expect(scopes).To(ContainElement(m.params.Scope))
+	}
+	m.snapshotID = util.GenerateUUID()
+	w.Header().Set(headerSnapshotNumber, m.snapshotID)
 
 	if len(scopes) == 1 {
 		//send bootstrap db
@@ -285,7 +290,7 @@
 func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
 	defer GinkgoRecover()
 
-	val := atomic.SwapInt32(m.newSnap, 0)
+	val := atomic.LoadInt32(m.newSnap)
 	if val > 0 {
 		log.Debug("MockServer: force new snapshot")
 		w.WriteHeader(http.StatusBadRequest)
@@ -311,7 +316,9 @@
 	//Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
 
 	Expect(scopes).To(ContainElement(m.params.ClusterID))
-	//Expect(scopes).To(ContainElement(m.params.Scope))
+	if m.params.Scope != "" {
+		Expect(scopes).To(ContainElement(m.params.Scope))
+	}
 
 	// todo: the following is just legacy for the existing test in apigeeSync_suite_test
 	developer := m.createDeveloperWithProductAndApp()
@@ -343,6 +350,7 @@
 
 		// force failing auth check
 		if atomic.LoadInt32(m.authFail) == 1 {
+			atomic.StoreInt32(m.authFail, 0)
 			w.WriteHeader(http.StatusUnauthorized)
 			w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
 			return
@@ -356,7 +364,7 @@
 
 		// check auth header
 		auth := req.Header.Get("Authorization")
-		expectedAuth := fmt.Sprintf("Bearer %s", m.oauthToken)
+		expectedAuth := m.getBearerToken()
 		if auth != expectedAuth {
 			w.WriteHeader(http.StatusUnauthorized)
 			w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
@@ -366,6 +374,10 @@
 	}
 }
 
+func (m *MockServer) getBearerToken() string {
+	return fmt.Sprintf("Bearer %s", m.oauthToken)
+}
+
 // make a handler unreliable
 func (m *MockServer) unreliable(target http.HandlerFunc) http.HandlerFunc {
 	if m.params.ReliableAPI {
diff --git a/pluginData.go b/pluginData.go
index c565c02..10b2dd0 100644
--- a/pluginData.go
+++ b/pluginData.go
@@ -16,7 +16,7 @@
 
 import "github.com/apid/apid-core"
 
-var pluginData = apid.PluginData{
+var PluginData = apid.PluginData{
 	Name:    "apidApigeeSync",
 	Version: "0.0.4",
 	ExtraData: map[string]interface{}{
diff --git a/snapshot.go b/snapshot.go
index df4d63f..023314d 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -15,12 +15,12 @@
 package apidApigeeSync
 
 import (
-	"github.com/apid/apid-core"
 	"github.com/apid/apid-core/data"
 	"github.com/apigee-labs/transicator/common"
 	"net/http"
 	"os"
 
+	"fmt"
 	"io"
 	"io/ioutil"
 	"net/url"
@@ -29,7 +29,14 @@
 	"time"
 )
 
-type simpleSnapShotManager struct {
+const bootstrapSnapshotName = "bootstrap"
+const lengthSqliteFileName = 7 // len("/sqlite")
+const (
+	headerSnapshotNumber = "Transicator-Snapshot-TXID"
+)
+
+type apidSnapshotManager struct {
+	*offlineSnapshotManager
 	// to send quit signal to the downloading thread
 	quitChan chan bool
 	// to mark the graceful close of snapshotManager
@@ -38,32 +45,40 @@
 	isClosed *int32
 	// make sure close() returns immediately if there's no downloading/processing snapshot
 	isDownloading *int32
+	tokenMan      tokenManager
+	dbMan         DbManager
+	client        *http.Client
 }
 
-func createSnapShotManager() *simpleSnapShotManager {
+func createSnapShotManager(dbMan DbManager, tokenMan tokenManager, client *http.Client) *apidSnapshotManager {
 	isClosedInt := int32(0)
 	isDownloadingInt := int32(0)
-	return &simpleSnapShotManager{
+	return &apidSnapshotManager{
+		offlineSnapshotManager: &offlineSnapshotManager{
+			dbMan: dbMan,
+		},
 		quitChan:      make(chan bool, 1),
 		finishChan:    make(chan bool, 1),
 		isClosed:      &isClosedInt,
 		isDownloading: &isDownloadingInt,
+		dbMan:         dbMan,
+		tokenMan:      tokenMan,
+		client:        client,
 	}
 }
 
 /*
- * thread-safe close of snapShotManager
+ * thread-safe close of snapshotManager
  * It marks status as closed immediately, and quits backoff downloading
  * use <- close() for blocking close
  * should only be called by pollChangeManager, because pollChangeManager is dependent on it
  */
-func (s *simpleSnapShotManager) close() <-chan bool {
+func (s *apidSnapshotManager) close() <-chan bool {
 	//has been closed before
 	if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
-		log.Error("snapShotManager: close() called on a closed snapShotManager!")
+		log.Warn("snapshotManager: close() called on a closed snapshotManager!")
 		go func() {
 			s.finishChan <- false
-			log.Debug("change manager closed")
 		}()
 		return s.finishChan
 	}
@@ -77,163 +92,52 @@
 }
 
 // retrieve boot information: apid_config and apid_config_scope
-func (s *simpleSnapShotManager) downloadBootSnapshot() {
+func (s *apidSnapshotManager) downloadBootSnapshot() {
 	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
 		log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
 	}
 	defer atomic.StoreInt32(s.isDownloading, int32(0))
 
-	// has been closed
-	if atomic.LoadInt32(s.isClosed) == int32(1) {
-		log.Warn("snapShotManager: downloadBootSnapshot called on closed snapShotManager")
-		return
-	}
-
 	log.Debug("download Snapshot for boot data")
 
 	scopes := []string{apidInfo.ClusterID}
 	snapshot := &common.Snapshot{}
 
-	err := s.downloadSnapshot(true, scopes, snapshot)
-	if err != nil {
-		// this may happen during shutdown
-		if _, ok := err.(quitSignalError); ok {
-			log.Warn("downloadBootSnapshot failed due to shutdown: " + err.Error())
-		}
-		return
-	}
-
-	// has been closed
-	if atomic.LoadInt32(s.isClosed) == int32(1) {
-		log.Error("snapShotManager: processSnapshot called on closed snapShotManager")
-		return
-	}
+	s.downloadSnapshot(true, scopes, snapshot)
 
 	// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
 	s.storeBootSnapshot(snapshot)
 }
 
-func (s *simpleSnapShotManager) storeBootSnapshot(snapshot *common.Snapshot) {
-	processSnapshot(snapshot)
+func (s *apidSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
+	if err := s.dbMan.processSnapshot(snapshot, false); err != nil {
+		log.Panic(err)
+	}
 }
 
 // use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func (s *simpleSnapShotManager) downloadDataSnapshot() {
+func (s *apidSnapshotManager) downloadDataSnapshot() error {
 	if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
 		log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
 	}
 	defer atomic.StoreInt32(s.isDownloading, int32(0))
 
-	// has been closed
-	if atomic.LoadInt32(s.isClosed) == int32(1) {
-		log.Warn("snapShotManager: downloadDataSnapshot called on closed snapShotManager")
-		return
-	}
-
 	log.Debug("download Snapshot for data scopes")
 
-	scopes := findScopesForId(apidInfo.ClusterID)
+	scopes, err := s.dbMan.findScopesForId(apidInfo.ClusterID)
+	if err != nil {
+		return err
+	}
 	scopes = append(scopes, apidInfo.ClusterID)
 	snapshot := &common.Snapshot{}
-	err := s.downloadSnapshot(false, scopes, snapshot)
-	if err != nil {
-		// this may happen during shutdown
-		if _, ok := err.(quitSignalError); ok {
-			log.Warn("downloadDataSnapshot failed due to shutdown: " + err.Error())
-		}
-		return
-	}
-	s.storeDataSnapshot(snapshot)
-}
-
-func (s *simpleSnapShotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-	knownTables = extractTablesFromSnapshot(snapshot)
-
-	_, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-
-	processSnapshot(snapshot)
-	log.Info("Emitting Snapshot to plugins")
-
-	select {
-	case <-time.After(pluginTimeout):
-		log.Panic("Timeout. Plugins failed to respond to snapshot.")
-	case <-events.Emit(ApigeeSyncEventSelector, snapshot):
-		// the new snapshot has been processed
-		// if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
-	}
-
-}
-
-func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from snapshot")
-	//if this panic ever fires, it's a bug
-	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-	return extractTablesFromDB(db)
-}
-
-func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from existing DB")
-	rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
-	defer rows.Close()
-
-	if err != nil {
-		log.Panicf("Error reading current set of tables: %v", err)
-	}
-
-	for rows.Next() {
-		var table string
-		if err := rows.Scan(&table); err != nil {
-			log.Panicf("Error reading current set of tables: %v", err)
-		}
-		log.Debugf("Table %s found in existing db", table)
-
-		tables[table] = true
-	}
-	return tables
-}
-
-// Skip Downloading snapshot if there is already a snapshot available from previous run
-func (s *simpleSnapShotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
-	log.Infof("Starting on local snapshot: %s", snapshotName)
-
-	// ensure DB version will be accessible on behalf of dependant plugins
-	db, err := dataService.DBVersion(snapshotName)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-
-	knownTables = extractTablesFromDB(db)
-	snapshot := &common.Snapshot{
-		SnapshotInfo: snapshotName,
-	}
-	processSnapshot(snapshot)
-
-	// allow plugins (including this one) to start immediately on existing database
-	// Note: this MUST have no tables as that is used as an indicator
-	return snapshot
+	s.downloadSnapshot(false, scopes, snapshot)
+	return s.startOnDataSnapshot(snapshot.SnapshotInfo)
 }
 
 // a blocking method
 // will keep retrying with backoff until success
 
-func (s *simpleSnapShotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
-	// if closed
-	if atomic.LoadInt32(s.isClosed) == int32(1) {
-		log.Warn("Trying to download snapshot with a closed snapShotManager")
-		return quitSignalError{}
-	}
+func (s *apidSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) {
 
 	log.Debug("downloadSnapshot")
 
@@ -254,12 +158,11 @@
 
 	//pollWithBackoff only accepts function that accept a single quit channel
 	//to accommodate functions which need more parameters, wrap them in closures
-	attemptDownload := getAttemptDownloadClosure(isBoot, snapshot, uri)
+	attemptDownload := s.getAttemptDownloadClosure(isBoot, snapshot, uri)
 	pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
-	return nil
 }
 
-func getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
+func (s *apidSnapshotManager) getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
 	return func(_ chan bool) error {
 
 		var tid string
@@ -268,19 +171,16 @@
 			// should never happen, but if it does, it's unrecoverable anyway
 			log.Panicf("Snapshotserver comm error: %v", err)
 		}
-		addHeaders(req)
-
-		var processSnapshotResponse func(string, io.Reader, *common.Snapshot) error
+		addHeaders(req, s.tokenMan.getBearerToken())
 
 		if config.GetString(configSnapshotProtocol) != "sqlite" {
 			log.Panic("Only currently supported snashot protocol is sqlite")
 
 		}
 		req.Header.Set("Accept", "application/transicator+sqlite")
-		processSnapshotResponse = processSnapshotServerFileResponse
 
 		// Issue the request to the snapshot server
-		r, err := httpclient.Do(req)
+		r, err := s.client.Do(req)
 		if err != nil {
 			log.Errorf("Snapshotserver comm error: %v", err)
 			return err
@@ -288,22 +188,28 @@
 
 		defer r.Body.Close()
 
-		if r.StatusCode != 200 {
+		switch r.StatusCode {
+		case http.StatusOK:
+			break
+		case http.StatusUnauthorized:
+			s.tokenMan.invalidateToken()
+			fallthrough
+		default:
 			body, _ := ioutil.ReadAll(r.Body)
 			log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
-			return expected200Error{}
+			return expected200Error
 		}
 
 		// Bootstrap scope is a special case, that can occur only once. The tid is
 		// hardcoded to "bootstrap" to ensure there can be no clash of tid between
 		// bootstrap and subsequent data scopes.
 		if isBoot {
-			tid = "bootstrap"
+			tid = bootstrapSnapshotName
 		} else {
-			tid = r.Header.Get("Transicator-Snapshot-TXID")
+			tid = r.Header.Get(headerSnapshotNumber)
 		}
 		// Decode the Snapshot server response
-		err = processSnapshotResponse(tid, r.Body, snapshot)
+		err = processSnapshotServerFileResponse(tid, r.Body, snapshot)
 		if err != nil {
 			log.Errorf("Snapshot server response Data not parsable: %v", err)
 			return err
@@ -315,11 +221,22 @@
 
 func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
 	dbPath := data.DBPath("common/" + dbId)
+	dbDir := dbPath[0 : len(dbPath)-lengthSqliteFileName]
 	log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
 
+	// if other bootstrap snapshot exists, delete the old file
+	if dbId == bootstrapSnapshotName {
+		if _, err := os.Stat(dbDir); !os.IsNotExist(err) {
+			if err = os.RemoveAll(dbDir); err != nil {
+				log.Errorf("Failed to delete old bootstrap snapshot; %v", err)
+				return err
+			}
+		}
+	}
+
 	//this path includes the sqlite3 file name.  why does mkdir all stop at parent??
 	log.Infof("Creating directory with mkdirall %s", dbPath)
-	err := os.MkdirAll(dbPath[0:len(dbPath)-7], 0700)
+	err := os.MkdirAll(dbDir, 0700)
 	if err != nil {
 		log.Errorf("Error creating db path %s", err)
 	}
@@ -343,10 +260,11 @@
 }
 
 func handleSnapshotServerError(err error) {
-	log.Debugf("Error connecting to snapshot server: %v", err)
+	log.Errorf("Error connecting to snapshot server: %v", err)
 }
 
 type offlineSnapshotManager struct {
+	dbMan DbManager
 }
 
 func (o *offlineSnapshotManager) close() <-chan bool {
@@ -355,33 +273,28 @@
 	return c
 }
 
-func (o *offlineSnapshotManager) downloadBootSnapshot() {}
-
-func (o *offlineSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadDataSnapshot() {}
-
-func (o *offlineSnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {}
-
-func (o *offlineSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
-	return nil
+func (o *offlineSnapshotManager) downloadBootSnapshot() {
+	log.Panic("downloadBootSnapshot called for offlineSnapshotManager")
 }
-func (o *offlineSnapshotManager) startOnLocalSnapshot(snapshotName string) *common.Snapshot {
-	log.Infof("Starting on local snapshot: %s", snapshotName)
 
-	// ensure DB version will be accessible on behalf of dependant plugins
-	db, err := dataService.DBVersion(snapshotName)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
+func (o *offlineSnapshotManager) downloadDataSnapshot() error {
+	return fmt.Errorf("downloadDataSnapshot called for offlineSnapshotManager")
+}
 
-	knownTables = extractTablesFromDB(db)
+func (o *offlineSnapshotManager) startOnDataSnapshot(snapshotName string) error {
+	log.Infof("Processing snapshot: %s", snapshotName)
 	snapshot := &common.Snapshot{
 		SnapshotInfo: snapshotName,
 	}
-	processSnapshot(snapshot)
-
-	// allow plugins (including this one) to start immediately on existing database
-	// Note: this MUST have no tables as that is used as an indicator
-	return snapshot
+	if err := o.dbMan.processSnapshot(snapshot, true); err != nil {
+		return err
+	}
+	log.Info("Emitting Snapshot to plugins")
+	select {
+	case <-time.After(pluginTimeout):
+		return fmt.Errorf("timeout, plugins failed to respond to snapshot")
+	case <-eventService.Emit(ApigeeSyncEventSelector, snapshot):
+		// the new snapshot has been processed
+	}
+	return nil
 }
diff --git a/snapshot_test.go b/snapshot_test.go
new file mode 100644
index 0000000..8d99563
--- /dev/null
+++ b/snapshot_test.go
@@ -0,0 +1,146 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+	"github.com/apid/apid-core"
+	"github.com/apid/apid-core/api"
+	"github.com/apigee-labs/transicator/common"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"net/http"
+	"net/http/httptest"
+	"strconv"
+	"time"
+)
+
+var _ = Describe("Snapshot Manager", func() {
+	testCount := 0
+	var dummyDbMan *dummyDbManager
+	BeforeEach(func() {
+		testCount++
+		dummyDbMan = &dummyDbManager{}
+	})
+
+	Context("offlineSnapshotManager", func() {
+		var testSnapMan *offlineSnapshotManager
+		BeforeEach(func() {
+			testSnapMan = &offlineSnapshotManager{
+				dbMan: dummyDbMan,
+			}
+		})
+		AfterEach(func() {
+			<-testSnapMan.close()
+		})
+
+		It("should have error if download called", func() {
+			Expect(testSnapMan.downloadDataSnapshot()).ToNot(Succeed())
+			Expect(func() { testSnapMan.downloadBootSnapshot() }).To(Panic())
+		})
+
+		It("startOnDataSnapshot should emit events", func() {
+			called := false
+			eventService.ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+				if _, ok := event.(*common.Snapshot); ok {
+					called = true
+				}
+			})
+			snapshotId := "test_snapshot_" + strconv.Itoa(testCount)
+			Expect(testSnapMan.startOnDataSnapshot(snapshotId)).Should(Succeed())
+			Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(snapshotId))
+			Expect(called).Should(BeTrue())
+		})
+	})
+
+	Context("apidSnapshotManager", func() {
+		var testSnapMan *apidSnapshotManager
+		var dummyTokenMan *dummyTokenManager
+		var testServer *httptest.Server
+		var testRouter apid.Router
+		var testMock *MockServer
+		BeforeEach(func() {
+			dummyTokenMan = &dummyTokenManager{
+				invalidateChan: make(chan bool, 1),
+			}
+			client := &http.Client{}
+			testSnapMan = createSnapShotManager(dummyDbMan, dummyTokenMan, client)
+
+			// create a new API service to have a new router for testing
+			testRouter = api.CreateService().Router()
+			testServer = httptest.NewServer(testRouter)
+			// set up mock server
+			mockParms := MockParms{
+				ReliableAPI:  true,
+				ClusterID:    config.GetString(configApidClusterId),
+				TokenKey:     config.GetString(configConsumerKey),
+				TokenSecret:  config.GetString(configConsumerSecret),
+				Scope:        "",
+				Organization: "att",
+				Environment:  "prod",
+			}
+			apidInfo.ClusterID = expectedClusterId
+			apidInfo.InstanceID = expectedInstanceId
+			testMock = Mock(mockParms, testRouter)
+			config.Set(configProxyServerBaseURI, testServer.URL)
+			config.Set(configSnapServerBaseURI, testServer.URL)
+			config.Set(configChangeServerBaseURI, testServer.URL)
+			config.Set(configPollInterval, 1*time.Millisecond)
+
+			initialBackoffInterval = time.Millisecond
+			testMock.oauthToken = "test_token_" + strconv.Itoa(testCount)
+			dummyTokenMan.token = testMock.oauthToken
+		})
+
+		AfterEach(func() {
+			<-testSnapMan.close()
+		})
+
+		It("downloadBootSnapshot happy path", func() {
+			testMock.normalAuthCheck()
+			testSnapMan.downloadBootSnapshot()
+			Expect(dummyDbMan.isDataSnapshot).Should(BeFalse())
+			Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName))
+		})
+
+		It("downloadBootSnapshot should retry for auth failure", func() {
+			testMock.forceAuthFailOnce()
+			testSnapMan.downloadBootSnapshot()
+			Expect(dummyDbMan.isDataSnapshot).Should(BeFalse())
+			Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(bootstrapSnapshotName))
+			Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue())
+		})
+
+		It("downloadDataSnapshot happy path", func() {
+			testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount)
+			dummyDbMan.scopes = []string{testMock.params.Scope}
+			testMock.normalAuthCheck()
+			testSnapMan.downloadDataSnapshot()
+			Expect(dummyDbMan.isDataSnapshot).Should(BeTrue())
+			Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID))
+		})
+
+		It("downloadDataSnapshot should retry for auth failure", func() {
+			testMock.params.Scope = "test_scope_" + strconv.Itoa(testCount)
+			dummyDbMan.scopes = []string{testMock.params.Scope}
+			testMock.forceAuthFailOnce()
+			testSnapMan.downloadDataSnapshot()
+			Expect(dummyDbMan.isDataSnapshot).Should(BeTrue())
+			Expect(dummyDbMan.snapshot.SnapshotInfo).Should(Equal(testMock.snapshotID))
+			Expect(<-dummyTokenMan.invalidateChan).Should(BeTrue())
+		})
+
+	})
+
+})
diff --git a/test_mock_test.go b/test_mock_test.go
index de2f673..9ba4e19 100644
--- a/test_mock_test.go
+++ b/test_mock_test.go
@@ -14,10 +14,115 @@
 package apidApigeeSync
 
 import (
+	"github.com/apid/apid-core"
 	"github.com/apigee-labs/transicator/common"
-	"net/url"
+	"math/rand"
+	"net/http"
+	"strconv"
 )
 
+type mockService struct {
+	config apid.ConfigService
+	log    apid.LogService
+	api    apid.APIService
+	data   apid.DataService
+	events apid.EventsService
+}
+
+func (s *mockService) API() apid.APIService {
+	return s.api
+}
+
+func (s *mockService) Config() apid.ConfigService {
+	return s.config
+}
+
+func (s *mockService) Data() apid.DataService {
+	return s.data
+}
+
+func (s *mockService) Events() apid.EventsService {
+	return s.events
+}
+
+func (s *mockService) Log() apid.LogService {
+	return s.log
+}
+
+type mockApi struct {
+	handleMap map[string]http.HandlerFunc
+}
+
+func (m *mockApi) Listen() error {
+	return nil
+}
+func (m *mockApi) Handle(path string, handler http.Handler) apid.Route {
+	return nil
+}
+func (m *mockApi) HandleFunc(path string, handlerFunc http.HandlerFunc) apid.Route {
+	m.handleMap[path] = handlerFunc
+	return apid.API().HandleFunc(path+strconv.Itoa(rand.Int()), handlerFunc)
+}
+func (m *mockApi) Vars(r *http.Request) map[string]string {
+	return nil
+}
+
+func (m *mockApi) Router() apid.Router {
+	return nil
+}
+
+type mockData struct {
+}
+
+func (m *mockData) DB() (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) DBForID(id string) (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) DBVersion(version string) (apid.DB, error) {
+	return nil, nil
+}
+func (m *mockData) DBVersionForID(id, version string) (apid.DB, error) {
+	return nil, nil
+}
+
+func (m *mockData) ReleaseDB(version string)          {}
+func (m *mockData) ReleaseCommonDB()                  {}
+func (m *mockData) ReleaseDBForID(id, version string) {}
+
+type mockEvent struct {
+	listenerMap map[apid.EventSelector]apid.EventHandlerFunc
+}
+
+func (e *mockEvent) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+	return nil
+}
+
+func (e *mockEvent) EmitWithCallback(selector apid.EventSelector, event apid.Event, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) ListenFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+
+}
+
+func (e *mockEvent) ListenOnceFunc(selector apid.EventSelector, handler apid.EventHandlerFunc) {
+	e.listenerMap[selector] = handler
+}
+
+func (e *mockEvent) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+
+}
+
+func (e *mockEvent) Close() {}
+
 type dummyChangeManager struct {
 	pollChangeWithBackoffChan chan bool
 }
@@ -34,43 +139,34 @@
 
 type dummyTokenManager struct {
 	invalidateChan chan bool
+	token          string
+	tokenReadyChan chan bool
 }
 
 func (t *dummyTokenManager) getTokenReadyChannel() <-chan bool {
-	return nil
+	return t.tokenReadyChan
 }
 
 func (t *dummyTokenManager) getBearerToken() string {
-	return ""
+	return t.token
 }
 
-func (t *dummyTokenManager) invalidateToken() error {
+func (t *dummyTokenManager) invalidateToken() {
 	log.Debug("invalidateToken called")
-	testMock.passAuthCheck()
 	t.invalidateChan <- true
-	return nil
-}
-
-func (t *dummyTokenManager) getToken() *OauthToken {
-	return nil
 }
 
 func (t *dummyTokenManager) close() {
 	return
 }
 
-func (t *dummyTokenManager) getRetrieveNewTokenClosure(*url.URL) func(chan bool) error {
-	return func(chan bool) error {
-		return nil
-	}
-}
-
 func (t *dummyTokenManager) start() {
 
 }
 
 type dummySnapshotManager struct {
 	downloadCalledChan chan bool
+	startCalledChan    chan bool
 }
 
 func (s *dummySnapshotManager) close() <-chan bool {
@@ -80,28 +176,60 @@
 }
 
 func (s *dummySnapshotManager) downloadBootSnapshot() {
-
+	s.downloadCalledChan <- false
 }
 
-func (s *dummySnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadDataSnapshot() {
-	log.Debug("dummySnapshotManager.downloadDataSnapshot() called")
+func (s *dummySnapshotManager) downloadDataSnapshot() error {
 	s.downloadCalledChan <- true
-}
-
-func (s *dummySnapshotManager) storeDataSnapshot(snapshot *common.Snapshot) {
-
-}
-
-func (s *dummySnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) error {
 	return nil
 }
 
-func (s *dummySnapshotManager) startOnLocalSnapshot(snapshot string) *common.Snapshot {
-	return &common.Snapshot{
-		SnapshotInfo: snapshot,
-	}
+func (s *dummySnapshotManager) startOnDataSnapshot(snapshot string) error {
+	s.startCalledChan <- true
+	return nil
+}
+
+type dummyDbManager struct {
+	lastSequence   string
+	knownTables    map[string]bool
+	scopes         []string
+	snapshot       *common.Snapshot
+	isDataSnapshot bool
+	lastSeqUpdated chan string
+}
+
+func (d *dummyDbManager) initDB() error {
+	return nil
+}
+func (d *dummyDbManager) setDB(db apid.DB) {
+
+}
+func (d *dummyDbManager) getLastSequence() (lastSequence string) {
+	return d.lastSequence
+}
+func (d *dummyDbManager) findScopesForId(configId string) (scopes []string, err error) {
+	return d.scopes, nil
+}
+func (d *dummyDbManager) updateLastSequence(lastSequence string) error {
+	d.lastSeqUpdated <- lastSequence
+	return nil
+}
+func (d *dummyDbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
+	return apidInstanceInfo{
+		InstanceID:   "",
+		InstanceName: "",
+		ClusterID:    "",
+		LastSnapshot: "",
+	}, nil
+}
+func (d *dummyDbManager) processChangeList(changes *common.ChangeList) error {
+	return nil
+}
+func (d *dummyDbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
+	d.snapshot = snapshot
+	d.isDataSnapshot = isDataSnapshot
+	return nil
+}
+func (d *dummyDbManager) getKnowTables() map[string]bool {
+	return d.knownTables
 }
diff --git a/token.go b/token.go
index 95d4b06..c355a61 100644
--- a/token.go
+++ b/token.go
@@ -17,7 +17,6 @@
 import (
 	"bytes"
 	"encoding/json"
-	"errors"
 	"github.com/apid/apid-core/util"
 	"io/ioutil"
 	"net/http"
@@ -35,15 +34,13 @@
 Usage:
    man := createTokenManager()
    bearer := man.getBearerToken()
-   // will automatically update config(configBearerToken) for other modules
-   // optionally, when done...
-   man.close()
+   will automatically update config(configBearerToken) for other modules
 */
 
-func createSimpleTokenManager() *simpleTokenManager {
+func createApidTokenManager(isNewInstance bool) *apidTokenManager {
 	isClosedInt := int32(0)
 
-	t := &simpleTokenManager{
+	t := &apidTokenManager{
 		quitPollingForToken: make(chan bool, 1),
 		closed:              make(chan bool),
 		getTokenChan:        make(chan bool),
@@ -52,11 +49,12 @@
 		invalidateDone:      make(chan bool),
 		tokenUpdatedChan:    make(chan bool, 1),
 		isClosed:            &isClosedInt,
+		isNewInstance:       isNewInstance,
 	}
 	return t
 }
 
-type simpleTokenManager struct {
+type apidTokenManager struct {
 	token               *OauthToken
 	isClosed            *int32
 	quitPollingForToken chan bool
@@ -67,19 +65,20 @@
 	returnTokenChan     chan *OauthToken
 	invalidateDone      chan bool
 	tokenUpdatedChan    chan bool
+	isNewInstance       bool
 }
 
-func (t *simpleTokenManager) start() {
+func (t *apidTokenManager) start() {
 	t.retrieveNewToken()
 	t.refreshTimer = time.After(t.token.refreshIn())
 	go t.maintainToken()
 }
 
-func (t *simpleTokenManager) getBearerToken() string {
+func (t *apidTokenManager) getBearerToken() string {
 	return t.getToken().AccessToken
 }
 
-func (t *simpleTokenManager) maintainToken() {
+func (t *apidTokenManager) maintainToken() {
 	for {
 		select {
 		case <-t.closed:
@@ -100,19 +99,13 @@
 }
 
 // will block until valid
-func (t *simpleTokenManager) invalidateToken() error {
-	//has been closed
-	if atomic.LoadInt32(t.isClosed) == int32(1) {
-		log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
-		return errors.New("invalidateToken() called on closed tokenManager")
-	}
+func (t *apidTokenManager) invalidateToken() {
 	log.Debug("invalidating token")
 	t.invalidateTokenChan <- true
 	<-t.invalidateDone
-	return nil
 }
 
-func (t *simpleTokenManager) getToken() *OauthToken {
+func (t *apidTokenManager) getToken() *OauthToken {
 	//has been closed
 	if atomic.LoadInt32(t.isClosed) == int32(1) {
 		log.Debug("TokenManager: getToken() called on closed tokenManager")
@@ -126,7 +119,7 @@
  * blocking close() of tokenMan
  */
 
-func (t *simpleTokenManager) close() {
+func (t *apidTokenManager) close() {
 	//has been closed
 	if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
 		log.Panic("TokenManager: close() has been called before!")
@@ -141,7 +134,7 @@
 }
 
 // don't call externally. will block until success.
-func (t *simpleTokenManager) retrieveNewToken() {
+func (t *apidTokenManager) retrieveNewToken() {
 
 	log.Debug("Getting OAuth token...")
 	uriString := config.GetString(configProxyServerBaseURI)
@@ -151,10 +144,10 @@
 	}
 	uri.Path = path.Join(uri.Path, "/accesstoken")
 
-	pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
+	pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : %v", err) })
 }
 
-func (t *simpleTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+func (t *apidTokenManager) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
 	return func(_ chan bool) error {
 		form := url.Values{}
 		form.Set("grant_type", "client_credentials")
@@ -168,9 +161,9 @@
 		req.Header.Set("status", "ONLINE")
 		req.Header.Set("plugin_details", apidPluginDetails)
 
-		if newInstanceID {
+		if t.isNewInstance {
 			req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339))
-			newInstanceID = false
+			t.isNewInstance = false
 		} else {
 			req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
 		}
@@ -194,7 +187,7 @@
 
 		if resp.StatusCode != 200 {
 			log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
-			return expected200Error{}
+			return expected200Error
 		}
 
 		var token OauthToken
@@ -212,22 +205,11 @@
 		}
 
 		log.Debugf("Got new token: %#v", token)
-
-		/*
-			if newInstanceID {
-				newInstanceID = false
-				err = updateApidInstanceInfo()
-				if err != nil {
-					log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err)
-					return err
-
-				}
-			}
-		*/
 		t.token = &token
 		config.Set(configBearerToken, token.AccessToken)
 
 		//don't block on the buffered channel.  that means there is already a signal to serve new token
+		//TODO: This assumes apid-gateway is 1-1 mapping. Make use of generic long-polling provided by apid-core
 		select {
 		case t.tokenUpdatedChan <- true:
 		default:
@@ -238,7 +220,7 @@
 	}
 }
 
-func (t *simpleTokenManager) getTokenReadyChannel() <-chan bool {
+func (t *apidTokenManager) getTokenReadyChannel() <-chan bool {
 	return t.tokenUpdatedChan
 }
 
diff --git a/token_test.go b/token_test.go
index 1dfaabb..fa9379c 100644
--- a/token_test.go
+++ b/token_test.go
@@ -94,7 +94,7 @@
 				w.Write(body)
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createApidTokenManager(false)
 			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 
@@ -123,7 +123,7 @@
 			}))
 			config.Set(configProxyServerBaseURI, ts.URL)
 
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createApidTokenManager(false)
 			testedTokenManager.start()
 			token := testedTokenManager.getToken()
 			Expect(token.AccessToken).ToNot(BeEmpty())
@@ -163,7 +163,7 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createApidTokenManager(false)
 			testedTokenManager.start()
 			testedTokenManager.getToken()
 
@@ -179,8 +179,6 @@
 			finished := make(chan bool, 1)
 			count := 0
 
-			newInstanceID = true
-
 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 
 				count++
@@ -202,7 +200,7 @@
 			}))
 
 			config.Set(configProxyServerBaseURI, ts.URL)
-			testedTokenManager := createSimpleTokenManager()
+			testedTokenManager := createApidTokenManager(true)
 			testedTokenManager.start()
 			testedTokenManager.getToken()
 			testedTokenManager.invalidateToken()
diff --git a/util.go b/util.go
new file mode 100644
index 0000000..7d59214
--- /dev/null
+++ b/util.go
@@ -0,0 +1,172 @@
+// Copyright 2017 Google Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package apidApigeeSync
+
+import (
+	"fmt"
+	"math"
+	"math/rand"
+	"net/http"
+	"time"
+)
+
+const (
+	httpTimeout                       = time.Minute
+	pluginTimeout                     = time.Minute
+	maxIdleConnsPerHost               = 10
+	defaultInitial      time.Duration = 200 * time.Millisecond
+	defaultMax          time.Duration = 10 * time.Second
+	defaultFactor       float64       = 2
+)
+
+var (
+	initialBackoffInterval = defaultInitial
+)
+
+var (
+	expected200Error = fmt.Errorf("did not recieve OK response")
+	quitSignalError  = fmt.Errorf("signal to quit encountered")
+	authFailError    = fmt.Errorf("authorization failed")
+)
+
+type Backoff struct {
+	attempt         int
+	initial, max    time.Duration
+	jitter          bool
+	backoffStrategy func() time.Duration
+}
+
+type ExponentialBackoff struct {
+	Backoff
+	factor float64
+}
+
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
+	backoff := &ExponentialBackoff{}
+
+	if initial <= 0 {
+		initial = defaultInitial
+	}
+	if max <= 0 {
+		max = defaultMax
+	}
+
+	if factor <= 0 {
+		factor = defaultFactor
+	}
+
+	backoff.initial = initial
+	backoff.max = max
+	backoff.attempt = 0
+	backoff.factor = factor
+	backoff.jitter = jitter
+	backoff.backoffStrategy = backoff.exponentialBackoffStrategy
+
+	return backoff
+}
+
+func (b *Backoff) Duration() time.Duration {
+	d := b.backoffStrategy()
+	b.attempt++
+	return d
+}
+
+func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
+
+	initial := float64(b.Backoff.initial)
+	attempt := float64(b.Backoff.attempt)
+	duration := initial * math.Pow(b.factor, attempt)
+
+	if duration > math.MaxInt64 {
+		return b.max
+	}
+	dur := time.Duration(duration)
+
+	if b.jitter {
+		duration = rand.Float64()*(duration-initial) + initial
+	}
+
+	if dur > b.max {
+		return b.max
+	}
+
+	log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
+	return dur
+}
+
+func (b *Backoff) Reset() {
+	b.attempt = 0
+}
+
+func (b *Backoff) Attempt() int {
+	return b.attempt
+}
+
+/*
+ * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
+ * for retrying on errors
+ */
+func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
+
+	backoff := NewExponentialBackoff(initialBackoffInterval, config.GetDuration(configPollInterval), 2, true)
+
+	//initialize the retry channel to start first attempt immediately
+	retry := time.After(0 * time.Millisecond)
+
+	for {
+		select {
+		case <-quit:
+			log.Info("Quit signal recieved.  Returning")
+			return
+		case <-retry:
+			start := time.Now()
+
+			err := toExecute(quit)
+			if err == nil || err == quitSignalError {
+				return
+			}
+
+			end := time.Now()
+			//error encountered, since we would have returned above otherwise
+			handleError(err)
+
+			/* TODO keep this around? Imagine an immediately erroring service,
+			 *  causing many sequential requests which could pollute logs
+			 */
+			//only backoff if the request took less than one second
+			if end.After(start.Add(time.Second)) {
+				backoff.Reset()
+				retry = time.After(0 * time.Millisecond)
+			} else {
+				retry = time.After(backoff.Duration())
+			}
+		}
+	}
+}
+
+func addHeaders(req *http.Request, token string) {
+	req.Header.Set("Authorization", "Bearer "+token)
+	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
+	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
+	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
+}
+
+type changeServerError struct {
+	Code string `json:"code"`
+}
+
+func (a changeServerError) Error() string {
+	return a.Code
+}
diff --git a/backoff_test.go b/util_test.go
similarity index 100%
rename from backoff_test.go
rename to util_test.go