Merge pull request #29 from 30x/refactor

Refactor
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 876ec9c..9e6ed94 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -1,7 +1,6 @@
 package apidApigeeSync
 
 import (
-	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 
@@ -12,17 +11,25 @@
 	"time"
 
 	"github.com/30x/apid-core"
+
 	"github.com/30x/apid-core/factory"
 )
 
 var (
-	tmpDir     string
-	testServer *httptest.Server
-	testRouter apid.Router
-	testMock   *MockServer
+	tmpDir         string
+	testServer     *httptest.Server
+	testRouter     apid.Router
+	testMock       *MockServer
+	wipeDBAferTest bool
 )
 
-var _ = BeforeSuite(func(done Done) {
+const dummyConfigValue string = "placeholder"
+
+var _ = BeforeSuite(func() {
+	wipeDBAferTest = true
+})
+
+var _ = BeforeEach(func(done Done) {
 	apid.Initialize(factory.DefaultServicesFactory())
 
 	config = apid.Config()
@@ -32,12 +39,9 @@
 	Expect(err).NotTo(HaveOccurred())
 	config.Set("local_storage_path", tmpDir)
 
-	testRouter = apid.API().Router()
-	testServer = httptest.NewServer(testRouter)
-
-	config.Set(configProxyServerBaseURI, testServer.URL)
-	config.Set(configSnapServerBaseURI, testServer.URL)
-	config.Set(configChangeServerBaseURI, testServer.URL)
+	config.Set(configProxyServerBaseURI, dummyConfigValue)
+	config.Set(configSnapServerBaseURI, dummyConfigValue)
+	config.Set(configChangeServerBaseURI, dummyConfigValue)
 	config.Set(configSnapshotProtocol, "json")
 	config.Set(configPollInterval, 10*time.Millisecond)
 
@@ -49,140 +53,27 @@
 	block = "0"
 	log = apid.Log()
 
-	// set up mock server
-	mockParms := MockParms{
-		ReliableAPI:  false,
-		ClusterID:    config.GetString(configApidClusterId),
-		TokenKey:     config.GetString(configConsumerKey),
-		TokenSecret:  config.GetString(configConsumerSecret),
-		Scope:        "ert452",
-		Organization: "att",
-		Environment:  "prod",
-	}
-	testMock = Mock(mockParms, testRouter)
+	_initPlugin(apid.AllServices())
+	close(done)
+}, 3)
 
-	// This is actually the first test :)
-	// Tests that entire bootstrap and set of sync operations work
-	var lastSnapshot *common.Snapshot
-
-	expectedSnapshotTables := make(map[string]bool)
-	expectedSnapshotTables["kms.company"] = true
-	expectedSnapshotTables["edgex.apid_cluster"] = true
-	expectedSnapshotTables["edgex.data_scope"] = true
-
-	apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-		defer GinkgoRecover()
-
-		if s, ok := event.(*common.Snapshot); ok {
-
-			//verify that during downloadDataSnapshot, knownTables was correctly populated
-			Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
-
-			/* After this, we will mock changes for tables not present in the initial snapshot
-			* until that is changed in the mock server, we have to spoof the known tables
-			 */
-
-			//add apid_cluster and data_scope since those would present if this were a real scenario
-			knownTables["kms.app_credential"] = true
-			knownTables["kms.app_credential_apiproduct_mapper"] = true
-			knownTables["kms.developer"] = true
-			knownTables["kms.company_developer"] = true
-			knownTables["kms.api_product"] = true
-			knownTables["kms.app"] = true
-
-			lastSnapshot = s
-
-			for _, t := range s.Tables {
-				switch t.Name {
-
-				case "edgex.apid_cluster":
-					Expect(t.Rows).To(HaveLen(1))
-					r := t.Rows[0]
-					var id string
-					r.Get("id", &id)
-					Expect(id).To(Equal("bootstrap"))
-
-				case "edgex.data_scope":
-					Expect(t.Rows).To(HaveLen(2))
-					r := t.Rows[1] // get the non-cluster row
-
-					var id, clusterID, env, org, scope string
-					r.Get("id", &id)
-					r.Get("apid_cluster_id", &clusterID)
-					r.Get("env", &env)
-					r.Get("org", &org)
-					r.Get("scope", &scope)
-
-					Expect(id).To(Equal("ert452"))
-					Expect(scope).To(Equal("ert452"))
-					Expect(clusterID).To(Equal("bootstrap"))
-					Expect(env).To(Equal("prod"))
-					Expect(org).To(Equal("att"))
-				}
-			}
-
-		} else if cl, ok := event.(*common.ChangeList); ok {
-
-			// ensure that snapshot switched DB versions
-			Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
-			expectedDB, err := data.DBVersion(lastSnapshot.SnapshotInfo)
-			Expect(err).NotTo(HaveOccurred())
-			Expect(getDB() == expectedDB).Should(BeTrue())
-
-			Expect(cl.Changes).To(HaveLen(6))
-
-			var tables []string
-			for _, c := range cl.Changes {
-				tables = append(tables, c.Table)
-				Expect(c.NewRow).ToNot(BeNil())
-
-				var tenantID string
-				c.NewRow.Get("tenant_id", &tenantID)
-				Expect(tenantID).To(Equal("ert452"))
-			}
-
-			Expect(tables).To(ContainElement("kms.app_credential"))
-			Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
-			Expect(tables).To(ContainElement("kms.developer"))
-			Expect(tables).To(ContainElement("kms.company_developer"))
-			Expect(tables).To(ContainElement("kms.api_product"))
-			Expect(tables).To(ContainElement("kms.app"))
-
-			events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) {
-				defer GinkgoRecover()
-
-				// allow other handler to execute to insert last_sequence
-				time.Sleep(50 * time.Millisecond)
-				var seq string
-				err = getDB().
-					QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
-					Scan(&seq)
-
-				Expect(err).NotTo(HaveOccurred())
-				Expect(seq).To(Equal(cl.LastSequence))
-
-				close(done)
-			})
-		}
-	})
-
-	apid.InitializePlugins()
-})
-
-var _ = BeforeEach(func() {
+var _ = AfterEach(func() {
 	apid.Events().Close()
 
 	lastSequence = ""
 
-	_, err := getDB().Exec("DELETE FROM APID_CLUSTER")
-	Expect(err).NotTo(HaveOccurred())
-	_, err = getDB().Exec("DELETE FROM DATA_SCOPE")
-	Expect(err).NotTo(HaveOccurred())
+	if wipeDBAferTest {
+		_, err := getDB().Exec("DELETE FROM APID_CLUSTER")
+		Expect(err).NotTo(HaveOccurred())
+		_, err = getDB().Exec("DELETE FROM DATA_SCOPE")
+		Expect(err).NotTo(HaveOccurred())
 
-	db, err := data.DB()
-	Expect(err).NotTo(HaveOccurred())
-	_, err = db.Exec("DELETE FROM APID")
-	Expect(err).NotTo(HaveOccurred())
+		db, err := dataService.DB()
+		Expect(err).NotTo(HaveOccurred())
+		_, err = db.Exec("DELETE FROM APID")
+		Expect(err).NotTo(HaveOccurred())
+	}
+	wipeDBAferTest = true
 })
 
 var _ = AfterSuite(func() {
diff --git a/apigee_sync.go b/apigee_sync.go
index be3fc8c..d6b3a19 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -1,514 +1,99 @@
 package apidApigeeSync
 
 import (
-	"encoding/json"
-	"net/http"
-	"net/url"
-	"path"
-	"time"
-
-	"sync/atomic"
-
-	"io/ioutil"
-
 	"github.com/30x/apid-core"
-	"github.com/apigee-labs/transicator/common"
+	"net/http"
+	"time"
 )
 
 const (
-	httpTimeout       = time.Minute
-	pluginTimeout     = time.Minute
-	maxBackoffTimeout = time.Minute
+	httpTimeout   = time.Minute
+	pluginTimeout = time.Minute
 )
 
-var (
-	block        string = "45"
-	lastSequence string
-	polling      uint32
-	knownTables  = make(map[string]bool)
-)
+var knownTables = make(map[string]bool)
 
 /*
- * Polls change agent for changes. In event of errors, uses a doubling
- * backoff from 200ms up to a max delay of the configPollInterval value.
+ *  Start from existing snapshot if possible
+ *  If an existing snapshot does not exist, use the apid scope to fetch
+ *  all data scopes, then get a snapshot for those data scopes
+ *
+ *  Then, poll for changes
  */
-func pollForChanges() {
+func bootstrap() {
 
-	if atomic.SwapUint32(&polling, 1) == 1 {
+	if apidInfo.LastSnapshot != "" {
+		snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
+
+		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
+			changeManager.pollChangeWithBackoff()
+		})
+
+		log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
 		return
 	}
 
-	var backOffFunc func()
-	pollInterval := config.GetDuration(configPollInterval)
-	for {
-		start := time.Now()
-		err := pollChangeAgent()
-		end := time.Now()
-		if err != nil {
-			if _, ok := err.(changeServerError); ok {
-				downloadDataSnapshot()
-				continue
-			}
-			log.Debugf("Error connecting to changeserver: %v", err)
-		}
-		if end.After(start.Add(time.Second)) {
-			backOffFunc = nil
-			continue
-		}
-		if backOffFunc == nil {
-			backOffFunc = createBackOff(200*time.Millisecond, pollInterval)
-		}
-		backOffFunc()
-	}
+	downloadBootSnapshot(nil)
+	downloadDataSnapshot(quitPollingSnapshotServer)
 
-	atomic.SwapUint32(&polling, 0)
+	changeManager.pollChangeWithBackoff()
+
 }
 
 /*
- * Long polls the change agent with a 45 second block. Parses the response from
- * change agent and raises an event. Called by pollForChanges().
+ * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
+ * for retrying on errors
  */
-func pollChangeAgent() error {
+func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
 
-	changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
-	if err != nil {
-		log.Errorf("bad url value for config %s: %s", changesUri, err)
-		return err
-	}
-	changesUri.Path = path.Join(changesUri.Path, "changes")
+	backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
 
-	/*
-	 * Check to see if we have lastSequence already saved in the DB,
-	 * in which case, it has to be used to prevent re-reading same data
-	 */
-	lastSequence = getLastSequence()
+	//inintialize the retry channel to start first attempt immediately
+	retry := time.After(0 * time.Millisecond)
+
 	for {
-		log.Debug("polling...")
+		select {
+		case <-quit:
+			log.Info("Quit signal recieved.  Returning")
+			return
+		case <-retry:
+			start := time.Now()
 
-		/* Find the scopes associated with the config id */
-		scopes := scopeCache.readAllScope()
-		v := url.Values{}
-
-		/* Sequence added to the query if available */
-		if lastSequence != "" {
-			v.Add("since", lastSequence)
-		}
-		v.Add("block", block)
-
-		/*
-		 * Include all the scopes associated with the config Id
-		 * The Config Id is included as well, as it acts as the
-		 * Bootstrap scope
-		 */
-		for _, scope := range scopes {
-			v.Add("scope", scope)
-		}
-		v.Add("scope", apidInfo.ClusterID)
-		v.Add("snapshot", apidInfo.LastSnapshot)
-		changesUri.RawQuery = v.Encode()
-		uri := changesUri.String()
-		log.Debugf("Fetching changes: %s", uri)
-
-		/* If error, break the loop, and retry after interval */
-		client := &http.Client{Timeout: httpTimeout} // must be greater than block value
-		req, err := http.NewRequest("GET", uri, nil)
-		addHeaders(req)
-
-		r, err := client.Do(req)
-		if err != nil {
-			log.Errorf("change agent comm error: %s", err)
-			return err
-		}
-
-		if r.StatusCode != http.StatusOK {
-			log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
-			switch r.StatusCode {
-			case http.StatusUnauthorized:
-				tokenManager.invalidateToken()
-
-			case http.StatusNotModified:
-				r.Body.Close()
-				continue
-
-			case http.StatusBadRequest:
-				var apiErr changeServerError
-				var b []byte
-				b, err = ioutil.ReadAll(r.Body)
-				if err != nil {
-					log.Errorf("Unable to read response body: %v", err)
-					break
-				}
-				err = json.Unmarshal(b, &apiErr)
-				if err != nil {
-					log.Errorf("JSON Response Data not parsable: %s", string(b))
-					break
-				}
-				if apiErr.Code == "SNAPSHOT_TOO_OLD" {
-					log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
-					err = apiErr
-				}
+			err := toExecute(quit)
+			if err == nil {
+				return
 			}
 
-			r.Body.Close()
-			return err
-		}
+			if _, ok := err.(quitSignalError); ok {
+				return
+			}
 
-		var resp common.ChangeList
-		err = json.NewDecoder(r.Body).Decode(&resp)
-		r.Body.Close()
-		if err != nil {
-			log.Errorf("JSON Response Data not parsable: %v", err)
-			return err
-		}
+			end := time.Now()
+			//error encountered, since we would have returned above otherwise
+			handleError(err)
 
-		/*
-		 * If the lastSequence is already newer or the same than what we got via
-		 * resp.LastSequence, Ignore it.
-		 */
-		if lastSequence != "" &&
-			getChangeStatus(lastSequence, resp.LastSequence) != 1 {
-			log.Errorf("Ignore change, already have newer changes")
-			continue
-		}
-
-		if changesRequireDDLSync(resp) {
-			log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
-			return changeServerError{
-				Code: "DDL changes detected; must get new snapshot",
+			/* TODO keep this around? Imagine an immediately erroring service,
+			 *  causing many sequential requests which could pollute logs
+			 */
+			//only backoff if the request took less than one second
+			if end.After(start.Add(time.Second)) {
+				backoff.Reset()
+				retry = time.After(0 * time.Millisecond)
+			} else {
+				retry = time.After(backoff.Duration())
 			}
 		}
-
-		/* If valid data present, Emit to plugins */
-		if len(resp.Changes) > 0 {
-			done := make(chan bool)
-			events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) {
-				done <- true
-			})
-
-			select {
-			case <-time.After(httpTimeout):
-				log.Panic("Timeout. Plugins failed to respond to changes.")
-			case <-done:
-			}
-		} else {
-			log.Debugf("No Changes detected for Scopes: %s", scopes)
-		}
-
-		updateSequence(resp.LastSequence)
 	}
 }
 
-/*
- * seqCurr.Compare() will return 1, if its newer than seqPrev,
- * else will return 0, if same, or -1 if older.
- */
-func getChangeStatus(lastSeq string, currSeq string) int {
-	seqPrev, err := common.ParseSequence(lastSeq)
-	if err != nil {
-		log.Panic("Unable to parse previous sequence string")
-	}
-	seqCurr, err := common.ParseSequence(currSeq)
-	if err != nil {
-		log.Panic("Unable to parse current sequence string")
-	}
-	return seqCurr.Compare(seqPrev)
-}
-
-func updateSequence(seq string) {
-	lastSequence = seq
-	err := updateLastSequence(seq)
-	if err != nil {
-		log.Panic("Unable to update Sequence in DB")
-	}
-
-}
-
-func changesRequireDDLSync(changes common.ChangeList) bool {
-
-	return !mapIsSubset(knownTables, extractTablesFromChangelist(changes))
-}
-
-// simple doubling back-off
-func createBackOff(retryIn, maxBackOff time.Duration) func() {
-	return func() {
-		if retryIn > maxBackOff {
-			retryIn = maxBackOff
-		}
-		log.Debugf("backoff called. will retry in %s.", retryIn)
-		time.Sleep(retryIn)
-		retryIn = retryIn * time.Duration(2)
-	}
-}
-
-func Redirect(req *http.Request, via []*http.Request) error {
+func Redirect(req *http.Request, _ []*http.Request) error {
 	req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
 	req.Header.Add("org", apidInfo.ClusterID) // todo: this is strange.. is it needed?
 	return nil
 }
 
-// pollForChanges should usually be true, tests use the flag
-func bootstrap() {
-
-	if apidInfo.LastSnapshot != "" {
-		startOnLocalSnapshot(apidInfo.LastSnapshot)
-		return
-	}
-
-	downloadBootSnapshot()
-	downloadDataSnapshot()
-	go pollForChanges()
-}
-
-// retrieve boot information: apid_config and apid_config_scope
-func downloadBootSnapshot() {
-	log.Debug("download Snapshot for boot data")
-
-	scopes := []string{apidInfo.ClusterID}
-	snapshot := downloadSnapshot(scopes)
-	storeBootSnapshot(snapshot)
-}
-
-func storeBootSnapshot(snapshot common.Snapshot) {
-	// note that for boot snapshot case, we don't touch databases. We only update in-mem cache
-	// This aims to deal with duplicate snapshot version#, see XAPID-869 for details
-	scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
-	for _, table := range snapshot.Tables {
-		if table.Name == LISTENER_TABLE_DATA_SCOPE {
-			for _, row := range table.Rows {
-				ds := makeDataScopeFromRow(row)
-				// cache scopes for this cluster
-				if ds.ClusterID == apidInfo.ClusterID {
-					scopeCache.updateCache(&ds)
-				}
-			}
-		}
-	}
-	// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
-}
-
-// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func downloadDataSnapshot() {
-	log.Debug("download Snapshot for data scopes")
-
-	scopes := scopeCache.readAllScope()
-
-	scopes = append(scopes, apidInfo.ClusterID)
-	snapshot := downloadSnapshot(scopes)
-	storeDataSnapshot(snapshot)
-}
-
-func storeDataSnapshot(snapshot common.Snapshot) {
-	knownTables = extractTablesFromSnapshot(snapshot)
-
-	db, err := data.DBVersion(snapshot.SnapshotInfo)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-	persistKnownTablesToDB(knownTables, db)
-
-	done := make(chan bool)
-	log.Info("Emitting Snapshot to plugins")
-	events.EmitWithCallback(ApigeeSyncEventSelector, &snapshot, func(event apid.Event) {
-		done <- true
-	})
-
-	select {
-	case <-time.After(pluginTimeout):
-		log.Panic("Timeout. Plugins failed to respond to snapshot.")
-	case <-done:
-	}
-
-}
-
-func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from snapshot")
-	if snapshot.Tables == nil {
-		//if this panic ever fires, it's a bug
-		log.Panicf("Attempt to extract known tables from snapshot without tables failed")
-	}
-
-	for _, table := range snapshot.Tables {
-		tables[table.Name] = true
-	}
-
-	return tables
-}
-
-func extractTablesFromChangelist(changes common.ChangeList) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	for _, change := range changes.Changes {
-		tables[change.Table] = true
-	}
-
-	return tables
-}
-
-func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
-
-	tables = make(map[string]bool)
-
-	log.Debug("Extracting table names from existing DB")
-	rows, err := db.Query("SELECT name FROM _known_tables;")
-	defer rows.Close()
-
-	if err != nil {
-		log.Panicf("Error reading current set of tables: %v", err)
-	}
-
-	for rows.Next() {
-		var table string
-		if err := rows.Scan(&table); err != nil {
-			log.Panicf("Error reading current set of tables: %v", err)
-		}
-		log.Debugf("Table %s found in existing db", table)
-
-		tables[table] = true
-	}
-	return tables
-}
-
-func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
-	log.Debugf("Inserting table names found in snapshot into db")
-
-	tx, err := db.Begin()
-	if err != nil {
-		log.Panicf("Error starting transaction: %v", err)
-	}
-	defer tx.Rollback()
-
-	_, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
-	if err != nil {
-		log.Panicf("Could not create _known_tables table: %s", err)
-	}
-
-	for name := range tables {
-		log.Debugf("Inserting %s into _known_tables", name)
-		_, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
-		if err != nil {
-			log.Panicf("Error encountered inserting into known tables ", err)
-		}
-
-	}
-
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Error committing transaction: %v", err)
-
-	}
-}
-
-// Skip Downloading snapshot if there is already a snapshot available from previous run
-func startOnLocalSnapshot(snapshot string) {
-	log.Infof("Starting on local snapshot: %s", snapshot)
-
-	// ensure DB version will be accessible on behalf of dependant plugins
-	db, err := data.DBVersion(snapshot)
-	if err != nil {
-		log.Panicf("Database inaccessible: %v", err)
-	}
-
-	knownTables = extractTablesFromDB(db)
-	scopeCache.clearAndInitCache(snapshot)
-
-	// allow plugins (including this one) to start immediately on existing database
-	// Note: this MUST have no tables as that is used as an indicator
-	snap := &common.Snapshot{
-		SnapshotInfo: snapshot,
-	}
-	events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) {
-		go pollForChanges()
-	})
-
-	log.Infof("Started on local snapshot: %s", snapshot)
-}
-
-// will keep retrying with backoff until success
-func downloadSnapshot(scopes []string) common.Snapshot {
-
-	log.Debug("downloadSnapshot")
-
-	snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
-	if err != nil {
-		log.Panicf("bad url value for config %s: %s", snapshotUri, err)
-	}
-
-	/* Frame and send the snapshot request */
-	snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")
-
-	v := url.Values{}
-	for _, scope := range scopes {
-		v.Add("scope", scope)
-	}
-	snapshotUri.RawQuery = v.Encode()
-	uri := snapshotUri.String()
-	log.Infof("Snapshot Download: %s", uri)
-
-	client := &http.Client{
-		CheckRedirect: Redirect,
-		Timeout:       httpTimeout,
-	}
-
-	retryIn := 5 * time.Millisecond
-	maxBackOff := maxBackoffTimeout
-	backOffFunc := createBackOff(retryIn, maxBackOff)
-	first := true
-
-	for {
-		if first {
-			first = false
-		} else {
-			backOffFunc()
-		}
-
-		req, err := http.NewRequest("GET", uri, nil)
-		if err != nil {
-			// should never happen, but if it does, it's unrecoverable anyway
-			log.Panicf("Snapshotserver comm error: %v", err)
-		}
-		addHeaders(req)
-
-		// Set the transport protocol type based on conf file input
-		if config.GetString(configSnapshotProtocol) == "json" {
-			req.Header.Set("Accept", "application/json")
-		} else {
-			req.Header.Set("Accept", "application/proto")
-		}
-
-		// Issue the request to the snapshot server
-		r, err := client.Do(req)
-		if err != nil {
-			log.Errorf("Snapshotserver comm error: %v", err)
-			continue
-		}
-
-		if r.StatusCode != 200 {
-			body, _ := ioutil.ReadAll(r.Body)
-			log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
-			r.Body.Close()
-			continue
-		}
-
-		// Decode the Snapshot server response
-		var resp common.Snapshot
-		err = json.NewDecoder(r.Body).Decode(&resp)
-		if err != nil {
-			log.Errorf("JSON Response Data not parsable: %v", err)
-			r.Body.Close()
-			continue
-		}
-
-		r.Body.Close()
-		return resp
-	}
-}
-
 func addHeaders(req *http.Request) {
-	req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
+	req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken())
 	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -518,25 +103,20 @@
 	Code string `json:"code"`
 }
 
+type quitSignalError struct {
+}
+
+type expected200Error struct {
+}
+
+func (an expected200Error) Error() string {
+	return "Did not recieve OK response"
+}
+
+func (a quitSignalError) Error() string {
+	return "Signal to quit encountered"
+}
+
 func (a changeServerError) Error() string {
 	return a.Code
-}
-
-/*
- * Determine is map b is a subset of map a
- */
-func mapIsSubset(a map[string]bool, b map[string]bool) bool {
-
-	//nil maps should not be passed in.  Making the distinction between nil map and empty map
-	if a == nil || b == nil {
-		return false
-	}
-
-	for k := range b {
-		if !a[k] {
-			return false
-		}
-	}
-
-	return true
-}
+}
\ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index e9af4df..9cdec11 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -5,90 +5,280 @@
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+	"net/http/httptest"
+	//"time"
 )
 
-var _ = Describe("listener", func() {
+var _ = Describe("Sync", func() {
 
-	It("should bootstrap from local DB if present", func(done Done) {
+	Context("Sync", func() {
 
-		expectedTables := make(map[string]bool)
-		expectedTables["kms.company"] = true
-		expectedTables["edgex.apid_cluster"] = true
-		expectedTables["edgex.data_scope"] = true
+		var initializeContext = func() {
+			testRouter = apid.API().Router()
+			testServer = httptest.NewServer(testRouter)
 
-		Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
-		apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-			defer GinkgoRecover()
-
-			if s, ok := event.(*common.Snapshot); ok {
-
-				//verify that the knownTables array has been properly populated from existing DB
-				Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue())
-
-				Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
-				Expect(s.Tables).To(BeNil())
-
-				close(done)
+			// set up mock server
+			mockParms := MockParms{
+				ReliableAPI:  false,
+				ClusterID:    config.GetString(configApidClusterId),
+				TokenKey:     config.GetString(configConsumerKey),
+				TokenSecret:  config.GetString(configConsumerSecret),
+				Scope:        "ert452",
+				Organization: "att",
+				Environment:  "prod",
 			}
+			testMock = Mock(mockParms, testRouter)
+
+			config.Set(configProxyServerBaseURI, testServer.URL)
+			config.Set(configSnapServerBaseURI, testServer.URL)
+			config.Set(configChangeServerBaseURI, testServer.URL)
+		}
+
+		var restoreContext = func() {
+
+			testServer.Close()
+
+			config.Set(configProxyServerBaseURI, dummyConfigValue)
+			config.Set(configSnapServerBaseURI, dummyConfigValue)
+			config.Set(configChangeServerBaseURI, dummyConfigValue)
+
+		}
+
+		It("should succesfully bootstrap from clean slate", func(done Done) {
+			log.Info("Starting sync tests...")
+			var closeDone <-chan bool
+			initializeContext()
+			// do not wipe DB after.  Lets use it
+			wipeDBAferTest = false
+			var lastSnapshot *common.Snapshot
+
+			expectedSnapshotTables := common.ChangeList{
+				Changes: []common.Change{common.Change{Table: "kms.company"},
+					common.Change{Table: "edgex.apid_cluster"},
+					common.Change{Table: "edgex.data_scope"}},
+			}
+
+			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+				if s, ok := event.(*common.Snapshot); ok {
+
+					Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
+
+					//add apid_cluster and data_scope since those would present if this were a real scenario
+					knownTables["kms.app_credential"] = true
+					knownTables["kms.app_credential_apiproduct_mapper"] = true
+					knownTables["kms.developer"] = true
+					knownTables["kms.company_developer"] = true
+					knownTables["kms.api_product"] = true
+					knownTables["kms.app"] = true
+
+					lastSnapshot = s
+
+					for _, t := range s.Tables {
+						switch t.Name {
+
+						case "edgex.apid_cluster":
+							Expect(t.Rows).To(HaveLen(1))
+							r := t.Rows[0]
+							var id string
+							r.Get("id", &id)
+							Expect(id).To(Equal("bootstrap"))
+
+						case "edgex.data_scope":
+							Expect(t.Rows).To(HaveLen(2))
+							r := t.Rows[1] // get the non-cluster row
+
+							var id, clusterID, env, org, scope string
+							r.Get("id", &id)
+							r.Get("apid_cluster_id", &clusterID)
+							r.Get("env", &env)
+							r.Get("org", &org)
+							r.Get("scope", &scope)
+
+							Expect(id).To(Equal("ert452"))
+							Expect(scope).To(Equal("ert452"))
+							Expect(clusterID).To(Equal("bootstrap"))
+							Expect(env).To(Equal("prod"))
+							Expect(org).To(Equal("att"))
+						}
+					}
+
+				} else if cl, ok := event.(*common.ChangeList); ok {
+					closeDone = changeManager.close()
+					// ensure that snapshot switched DB versions
+					Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
+					expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
+					Expect(err).NotTo(HaveOccurred())
+					Expect(getDB() == expectedDB).Should(BeTrue())
+
+					Expect(cl.Changes).To(HaveLen(6))
+
+					var tables []string
+					for _, c := range cl.Changes {
+						tables = append(tables, c.Table)
+						Expect(c.NewRow).ToNot(BeNil())
+
+						var tenantID string
+						c.NewRow.Get("tenant_id", &tenantID)
+						Expect(tenantID).To(Equal("ert452"))
+					}
+
+					Expect(tables).To(ContainElement("kms.app_credential"))
+					Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
+					Expect(tables).To(ContainElement("kms.developer"))
+					Expect(tables).To(ContainElement("kms.company_developer"))
+					Expect(tables).To(ContainElement("kms.api_product"))
+					Expect(tables).To(ContainElement("kms.app"))
+
+					go func() {
+						// when close done, all handlers for the first changeList have been executed
+						<-closeDone
+						defer GinkgoRecover()
+						// allow other handler to execute to insert last_sequence
+						var seq string
+						//for seq = ""; seq == ""; {
+						//	time.Sleep(50 * time.Millisecond)
+						err := getDB().
+							QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
+							Scan(&seq)
+						Expect(err).NotTo(HaveOccurred())
+						//}
+						Expect(seq).To(Equal(cl.LastSequence))
+
+						restoreContext()
+						close(done)
+					}()
+
+				}
+			})
+			pie := apid.PluginsInitializedEvent{
+				Description: "plugins initialized",
+			}
+			pie.Plugins = append(pie.Plugins, pluginData)
+			postInitPlugins(pie)
+		}, 3)
+
+		It("should bootstrap from local DB if present", func(done Done) {
+
+			var closeDone <-chan bool
+
+			initializeContext()
+			expectedTables := common.ChangeList{
+				Changes: []common.Change{common.Change{Table: "kms.company"},
+					common.Change{Table: "edgex.apid_cluster"},
+					common.Change{Table: "edgex.data_scope"}},
+			}
+			Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
+
+			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+				if s, ok := event.(*common.Snapshot); ok {
+					// In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
+					// This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
+					closeDone = changeManager.close()
+					go func() {
+						// when close done, all handlers for the first snapshot have been executed
+						<-closeDone
+						//verify that the knownTables array has been properly populated from existing DB
+						Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
+
+						Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
+						Expect(s.Tables).To(BeNil())
+
+						restoreContext()
+						close(done)
+					}()
+
+				}
+			})
+			pie := apid.PluginsInitializedEvent{
+				Description: "plugins initialized",
+			}
+			pie.Plugins = append(pie.Plugins, pluginData)
+			postInitPlugins(pie)
+
+		}, 3)
+
+		It("should correctly identify non-proper subsets with respect to maps", func() {
+
+			//test b proper subset of a
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+				[]common.Change{common.Change{Table: "b"}},
+			)).To(BeFalse())
+
+			//test a == b
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
+			)).To(BeFalse())
+
+			//test b superset of a
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+				[]common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
+			)).To(BeTrue())
+
+			//test b not subset of a
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+				[]common.Change{common.Change{Table: "c"}},
+			)).To(BeTrue())
+
+			//test a empty
+			Expect(changesHaveNewTables(map[string]bool{},
+				[]common.Change{common.Change{Table: "a"}},
+			)).To(BeTrue())
+
+			//test b empty
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+				[]common.Change{},
+			)).To(BeFalse())
+
+			//test b nil
+			Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue())
+
+			//test a nil
+			Expect(changesHaveNewTables(nil,
+				[]common.Change{common.Change{Table: "a"}},
+			)).To(BeTrue())
+		}, 3)
+
+		// todo: disabled for now -
+		// there is precondition I haven't been able to track down that breaks this test on occasion
+		XIt("should process a new snapshot when change server requires it", func(done Done) {
+			oldSnap := apidInfo.LastSnapshot
+			apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+				defer GinkgoRecover()
+
+				if s, ok := event.(*common.Snapshot); ok {
+					Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
+					close(done)
+				}
+			})
+			testMock.forceNewSnapshot()
 		})
 
-		bootstrap()
+
+		It("Verify the Sequence Number Logic works as expected", func() {
+			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+			Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+			Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+			Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+			Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+		}, 3)
+
+		/*
+		 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+		 */
+		It("Should be able to handle duplicate snapshot during bootstrap", func() {
+			initializeContext()
+
+			tokenManager = createTokenManager()
+			events.Listen(ApigeeSyncEventSelector, &handler{})
+
+			scopes := []string{apidInfo.ClusterID}
+			snapshot := &common.Snapshot{}
+			downloadSnapshot(scopes, snapshot, nil)
+			storeBootSnapshot(snapshot)
+			storeDataSnapshot(snapshot)
+			restoreContext()
+		}, 3)
 	})
-
-	It("should correctly identify non-proper subsets with respect to maps", func() {
-
-		//test b proper subset of a
-		Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"b": true})).To(BeTrue())
-
-		//test a == b
-		Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true})).To(BeTrue())
-
-		//test b superset of a
-		Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true, "c": true})).To(BeFalse())
-
-		//test b not subset of a
-		Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"c": true})).To(BeFalse())
-
-		//test b empty
-		Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{})).To(BeTrue())
-
-		//test a empty
-		Expect(mapIsSubset(map[string]bool{}, map[string]bool{"b": true})).To(BeFalse())
-	})
-
-	// todo: disabled for now -
-	// there is precondition I haven't been able to track down that breaks this test on occasion
-	XIt("should process a new snapshot when change server requires it", func(done Done) {
-		oldSnap := apidInfo.LastSnapshot
-		apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
-			defer GinkgoRecover()
-
-			if s, ok := event.(*common.Snapshot); ok {
-				Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
-				close(done)
-			}
-		})
-		testMock.forceNewSnapshot()
-	})
-
-	It("Verify the Sequence Number Logic works as expected", func() {
-		Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
-		Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
-		Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
-		Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
-		Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
-		Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
-	})
-
-	/*
-	 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
-	 */
-	It("Should be able to handle duplicate snapshot during bootstrap", func() {
-		scopes := []string{apidInfo.ClusterID}
-		snapshot := downloadSnapshot(scopes)
-		storeBootSnapshot(snapshot)
-		storeDataSnapshot(snapshot)
-	})
-
 })
diff --git a/backoff.go b/backoff.go
new file mode 100644
index 0000000..e3a7403
--- /dev/null
+++ b/backoff.go
@@ -0,0 +1,84 @@
+package apidApigeeSync
+
+import (
+	"math"
+	"time"
+	"math/rand"
+)
+
+const defaultInitial time.Duration = 200 * time.Millisecond
+const defaultMax time.Duration = 10 * time.Second
+const defaultFactor float64 = 2
+
+type Backoff struct {
+	attempt         int
+	initial, max    time.Duration
+	jitter bool
+	backoffStrategy func() time.Duration
+}
+
+type ExponentialBackoff struct {
+	Backoff
+	factor float64
+}
+
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
+	backoff := &ExponentialBackoff{}
+
+	if initial <= 0 {
+		initial = defaultInitial
+	}
+	if max <= 0 {
+		max = defaultMax
+	}
+
+	if factor <= 0 {
+		factor = defaultFactor
+	}
+
+	backoff.initial = initial
+	backoff.max = max
+	backoff.attempt = 0
+	backoff.factor = factor
+	backoff.jitter = jitter
+	backoff.backoffStrategy = backoff.exponentialBackoffStrategy
+
+	return backoff
+}
+
+func (b *Backoff) Duration() time.Duration {
+	d := b.backoffStrategy()
+	b.attempt++
+	return d
+}
+
+func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
+
+	initial := float64(b.Backoff.initial)
+	attempt := float64(b.Backoff.attempt)
+	duration := initial * math.Pow(b.factor, attempt)
+
+	if duration > math.MaxInt64 {
+		return b.max
+	}
+	dur := time.Duration(duration)
+
+	if b.jitter {
+		duration = (rand.Float64()*(duration-initial) + initial)
+	}
+
+	if dur > b.max {
+		return b.max
+	}
+
+	log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
+	return dur
+}
+
+func (b *Backoff) Reset() {
+	b.attempt = 0
+}
+
+func (b *Backoff) Attempt() int {
+	return b.attempt
+}
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..ae85909
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,56 @@
+package apidApigeeSync
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"time"
+)
+
+var _ = Describe("backoff", func() {
+
+	Context("Backoff timeout calculations", func() {
+
+		It("Should properly apply defaults", func() {
+			log.Info("Starting backoff tests...")
+			b := NewExponentialBackoff(0, 0, 0, true)
+			Expect(defaultInitial).To(Equal(b.initial))
+			Expect(defaultMax).To(Equal(b.max))
+			Expect(defaultFactor).To(Equal(b.factor))
+
+			b = NewExponentialBackoff(-1, -1, -1, true)
+			Expect(defaultInitial).To(Equal(b.initial))
+			Expect(defaultMax).To(Equal(b.max))
+			Expect(defaultFactor).To(Equal(b.factor))
+		})
+
+		It("should properly apply exponential backoff strategy", func() {
+			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+			Expect(1600 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(4).To(Equal(b.Attempt()))
+		})
+
+		It("should reset properly", func() {
+			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+			b.Reset()
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+		})
+	})
+
+})
diff --git a/changes.go b/changes.go
new file mode 100644
index 0000000..aa8d822
--- /dev/null
+++ b/changes.go
@@ -0,0 +1,319 @@
+package apidApigeeSync
+
+import (
+	"encoding/json"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+	"time"
+
+	"github.com/apigee-labs/transicator/common"
+	"sync/atomic"
+)
+
+var lastSequence string
+var block string = "45"
+
+type pollChangeManager struct {
+	// 0 for not closed, 1 for closed
+	isClosed *int32
+	// 0 for pollChangeWithBackoff() not launched, 1 for launched
+	isLaunched *int32
+	quitChan   chan bool
+}
+
+func createChangeManager() *pollChangeManager {
+	isClosedInt := int32(0)
+	isLaunchedInt := int32(0)
+	return &pollChangeManager{
+		isClosed:   &isClosedInt,
+		quitChan:   make(chan bool),
+		isLaunched: &isLaunchedInt,
+	}
+}
+
+/*
+ * thread-safe close of pollChangeManager
+ * It marks status as closed immediately, quits backoff polling agent, and closes tokenManager
+ * use <- close() for blocking close
+ */
+func (c *pollChangeManager) close() <-chan bool {
+	finishChan := make(chan bool, 1)
+	//has been closed
+	if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
+		log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
+		go func() {
+			finishChan <- false
+			log.Debug("change manager closed")
+		}()
+		return finishChan
+	}
+	// not launched
+	if atomic.LoadInt32(c.isLaunched) == int32(0) {
+		log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!")
+		go func() {
+			tokenManager.close()
+			finishChan <- false
+			log.Debug("change manager closed")
+		}()
+		return finishChan
+	}
+	// launched
+	log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
+	go func() {
+		c.quitChan <- true
+		tokenManager.close()
+		finishChan <- true
+		log.Debug("change manager closed")
+	}()
+	return finishChan
+}
+
+/*
+ * thread-safe pollChangeWithBackoff(), guaranteed: only one polling thread
+ */
+
+func (c *pollChangeManager) pollChangeWithBackoff() {
+	// closed
+	if atomic.LoadInt32(c.isClosed) == int32(1) {
+		log.Error("pollChangeManager: pollChangeWithBackoff() called after closed")
+		return
+	}
+	// has been launched before
+	if atomic.SwapInt32(c.isLaunched, 1) == int32(1) {
+		log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before")
+		return
+	}
+
+	go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
+	log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff")
+
+}
+
+/*
+ * Long polls the change agent with a 45 second block. Parses the response from
+ * change agent and raises an event. Called by pollWithBackoff().
+ */
+func (c *pollChangeManager) pollChangeAgent(dummyQuit chan bool) error {
+
+	changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
+	if err != nil {
+		log.Errorf("bad url value for config %s: %s", changesUri, err)
+		return err
+	}
+	changesUri.Path = path.Join(changesUri.Path, "changes")
+
+	/*
+	 * Check to see if we have lastSequence already saved in the DB,
+	 * in which case, it has to be used to prevent re-reading same data
+	 */
+	lastSequence = getLastSequence()
+
+	for {
+		select {
+		case <-c.quitChan:
+			log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
+			return quitSignalError{}
+		default:
+			err := c.getChanges(changesUri)
+			if err != nil {
+				if _, ok := err.(quitSignalError); ok {
+					log.Debug("pollChangeAgent: consuming the quit signal")
+					<-c.quitChan
+				}
+				return err
+			}
+		}
+	}
+}
+
+//TODO refactor this method more, split it up
+/* Make a single request to the changeserver to get a changelist */
+func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
+	// if closed
+	if atomic.LoadInt32(c.isClosed) == int32(1) {
+		return quitSignalError{}
+	}
+	log.Debug("polling...")
+
+	/* Find the scopes associated with the config id */
+	scopes := findScopesForId(apidInfo.ClusterID)
+	v := url.Values{}
+
+	/* Sequence added to the query if available */
+	if lastSequence != "" {
+		v.Add("since", lastSequence)
+	}
+	v.Add("block", block)
+
+	/*
+	 * Include all the scopes associated with the config Id
+	 * The Config Id is included as well, as it acts as the
+	 * Bootstrap scope
+	 */
+	for _, scope := range scopes {
+		v.Add("scope", scope)
+	}
+	v.Add("scope", apidInfo.ClusterID)
+	v.Add("snapshot", apidInfo.LastSnapshot)
+	changesUri.RawQuery = v.Encode()
+	uri := changesUri.String()
+	log.Debugf("Fetching changes: %s", uri)
+
+	/* If error, break the loop, and retry after interval */
+	client := &http.Client{Timeout: httpTimeout} // must be greater than block value
+	req, err := http.NewRequest("GET", uri, nil)
+	addHeaders(req)
+
+	r, err := client.Do(req)
+	if err != nil {
+		log.Errorf("change agent comm error: %s", err)
+		// if closed
+		if atomic.LoadInt32(c.isClosed) == int32(1) {
+			return quitSignalError{}
+		}
+		return err
+	}
+	defer r.Body.Close()
+
+	// has been closed
+	if atomic.LoadInt32(c.isClosed) == int32(1) {
+		log.Debugf("getChanges: changeManager has been closed")
+		return quitSignalError{}
+	}
+
+	if r.StatusCode != http.StatusOK {
+		log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
+		switch r.StatusCode {
+		case http.StatusUnauthorized:
+			tokenManager.invalidateToken()
+			return nil
+
+		case http.StatusNotModified:
+			return nil
+
+		case http.StatusBadRequest:
+			var apiErr changeServerError
+			var b []byte
+			b, err = ioutil.ReadAll(r.Body)
+			if err != nil {
+				log.Errorf("Unable to read response body: %v", err)
+				return err
+			}
+			err = json.Unmarshal(b, &apiErr)
+			if err != nil {
+				log.Errorf("JSON Response Data not parsable: %s", string(b))
+				return err
+			}
+			if apiErr.Code == "SNAPSHOT_TOO_OLD" {
+				log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
+				err = apiErr
+			}
+			return nil
+		}
+		return nil
+	}
+
+	var resp common.ChangeList
+	err = json.NewDecoder(r.Body).Decode(&resp)
+	if err != nil {
+		log.Errorf("JSON Response Data not parsable: %v", err)
+		return err
+	}
+
+	/*
+	 * If the lastSequence is already newer or the same than what we got via
+	 * resp.LastSequence, Ignore it.
+	 */
+	if lastSequence != "" &&
+		getChangeStatus(lastSequence, resp.LastSequence) != 1 {
+		return changeServerError{
+			Code: "Ignore change, already have newer changes",
+		}
+	}
+
+	if changesRequireDDLSync(resp) {
+		return changeServerError{
+			Code: "DDL changes detected; must get new snapshot",
+		}
+	}
+
+	/* If valid data present, Emit to plugins */
+	if len(resp.Changes) > 0 {
+		select {
+		case <-time.After(httpTimeout):
+			log.Panic("Timeout. Plugins failed to respond to changes.")
+		case <-events.Emit(ApigeeSyncEventSelector, &resp):
+		}
+	} else {
+		log.Debugf("No Changes detected for Scopes: %s", scopes)
+	}
+
+	updateSequence(resp.LastSequence)
+
+	return nil
+}
+
+func changesRequireDDLSync(changes common.ChangeList) bool {
+	return changesHaveNewTables(knownTables, changes.Changes)
+}
+
+func (c *pollChangeManager) handleChangeServerError(err error) {
+	// has been closed
+	if atomic.LoadInt32(c.isClosed) == int32(1) {
+		log.Debugf("handleChangeServerError: changeManager has been closed")
+		return
+	}
+	if _, ok := err.(changeServerError); ok {
+		log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
+		downloadDataSnapshot(c.quitChan)
+	} else {
+		log.Debugf("Error connecting to changeserver: %v", err)
+	}
+}
+
+/*
+ * Determine if any tables in changes are not present in known tables
+ */
+func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
+
+	//nil maps should not be passed in.  Making the distinction between nil map and empty map
+	if a == nil || changes == nil {
+		return true
+	}
+
+	for _, change := range changes {
+		if !a[change.Table] {
+			log.Infof("Unable to find %s table in current known tables", change.Table)
+			return true
+		}
+	}
+
+	return false
+}
+
+/*
+ * seqCurr.Compare() will return 1, if its newer than seqPrev,
+ * else will return 0, if same, or -1 if older.
+ */
+func getChangeStatus(lastSeq string, currSeq string) int {
+	seqPrev, err := common.ParseSequence(lastSeq)
+	if err != nil {
+		log.Panic("Unable to parse previous sequence string")
+	}
+	seqCurr, err := common.ParseSequence(currSeq)
+	if err != nil {
+		log.Panic("Unable to parse current sequence string")
+	}
+	return seqCurr.Compare(seqPrev)
+}
+
+func updateSequence(seq string) {
+	lastSequence = seq
+	err := updateLastSequence(seq)
+	if err != nil {
+		log.Panic("Unable to update Sequence in DB")
+	}
+
+}
diff --git a/data.go b/data.go
index 08ff3ba..48f1ff7 100644
--- a/data.go
+++ b/data.go
@@ -88,8 +88,9 @@
 
 	log.Debugf("inserting into APID_CLUSTER: %v", dac)
 
+	//replace to accomodate same snapshot txid
 	stmt, err := txn.Prepare(`
-	INSERT INTO APID_CLUSTER
+	REPLACE INTO APID_CLUSTER
 		(id, description, name, umbrella_org_app_name,
 		created, created_by, updated, updated_by,
 		last_sequence)
@@ -117,8 +118,9 @@
 
 	log.Debugf("insert DATA_SCOPE: %v", ds)
 
+	//replace to accomodate same snapshot txid
 	stmt, err := txn.Prepare(`
-	INSERT INTO DATA_SCOPE
+	REPLACE INTO DATA_SCOPE
 		(id, apid_cluster_id, scope, org,
 		env, created, created_by, updated,
 		updated_by)
@@ -237,7 +239,7 @@
 
 	// always use default database for this
 	var db apid.DB
-	db, err = data.DB()
+	db, err = dataService.DB()
 	if err != nil {
 		return
 	}
@@ -254,6 +256,7 @@
 			newInstanceID = true
 			info.InstanceID = generateUUID()
 
+			log.Debugf("Inserting new apid instance id %s", info.InstanceID)
 			db.Exec("INSERT INTO APID (instance_id, last_snapshot_info) VALUES (?,?)",
 				info.InstanceID, "")
 		}
@@ -264,7 +267,7 @@
 func updateApidInstanceInfo() error {
 
 	// always use default database for this
-	db, err := data.DB()
+	db, err := dataService.DB()
 	if err != nil {
 		return err
 	}
diff --git a/datascope_cache.go b/datascope_cache.go
deleted file mode 100644
index 3d19710..0000000
--- a/datascope_cache.go
+++ /dev/null
@@ -1,93 +0,0 @@
-package apidApigeeSync
-
-const (
-	readCache int = iota
-	updateCache
-	removeCache
-	clearAndInit
-)
-
-/*
- * structs for DatascopeCache
- */
-
-type cacheOperationRequest struct {
-	Operation int
-	Scope     *dataDataScope
-	version   string
-}
-
-// maintain an in-mem cache of datascope
-type DatascopeCache struct {
-	requestChan  chan *cacheOperationRequest
-	readDoneChan chan []string
-	scopeMap     map[string]*dataDataScope
-	version      string
-}
-
-var scopeCache *DatascopeCache
-
-func (cache *DatascopeCache) datascopeCacheManager() {
-	for request := range cache.requestChan {
-		switch request.Operation {
-		case readCache:
-			log.Debug("datascopeCacheManager: readCache")
-			scopes := make([]string, 0, len(cache.scopeMap))
-			for _, ds := range cache.scopeMap {
-				scopes = append(scopes, ds.Scope)
-			}
-			cache.readDoneChan <- scopes
-		case updateCache:
-			log.Debug("datascopeCacheManager: updateCache")
-			cache.scopeMap[request.Scope.ID] = request.Scope
-		case removeCache:
-			log.Debug("datascopeCacheManager: removeCache")
-			delete(cache.scopeMap, request.Scope.ID)
-		case clearAndInit:
-			log.Debug("datascopeCacheManager: clearAndInit")
-			if cache.version != request.version {
-				cache.scopeMap = make(map[string]*dataDataScope)
-				cache.version = request.version
-			}
-		}
-	}
-
-	//chan closed
-	cache.scopeMap = nil
-	close(cache.readDoneChan)
-}
-
-/*
- * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID)
- */
-
-func (cache *DatascopeCache) readAllScope() []string {
-	cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
-	scopes := <-cache.readDoneChan
-	// eliminate duplicates
-	tmpMap := make(map[string]bool)
-	for _, scope := range scopes {
-		tmpMap[scope] = true
-	}
-	scopes = make([]string, 0)
-	for scope := range tmpMap {
-		scopes = append(scopes, scope)
-	}
-	return scopes
-}
-
-func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
-	cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
-}
-
-func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
-	cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
-}
-
-func (cache *DatascopeCache) clearAndInitCache(version string) {
-	cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
-}
-
-func (cache *DatascopeCache) closeCache() {
-	close(cache.requestChan)
-}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go
deleted file mode 100644
index bc67f53..0000000
--- a/datascope_cache_test.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package apidApigeeSync
-
-import (
-	. "github.com/onsi/ginkgo"
-	. "github.com/onsi/gomega"
-	"math/rand"
-	"strconv"
-	"time"
-)
-
-var _ = Describe("datascope cache", func() {
-	/*
-	 * in-mem cache test
-	 */
-	It("Test In-mem cache", func() {
-		testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
-		go testCache.datascopeCacheManager()
-		testCache.clearAndInitCache("test-version")
-		countChan := make(chan int)
-		base := 10
-		rand.Seed(time.Now().Unix())
-		num := base + rand.Intn(base)
-		scopeMap := make(map[string]bool)
-		// async update
-		for i := 0; i < num; i++ {
-			id := strconv.Itoa(i)
-			scopeStr := strconv.Itoa(i % base)
-			scope := &dataDataScope{ID: id, Scope: scopeStr}
-			scopeMap[scope.Scope] = true
-			go func(scope *dataDataScope) {
-				testCache.updateCache(scope)
-				countChan <- 1
-			}(scope)
-		}
-
-		// wait until update done
-		for i := 0; i < num; i++ {
-			<-countChan
-		}
-
-		// verify update
-		retrievedScopes := testCache.readAllScope()
-		Expect(len(scopeMap)).To(Equal(len(retrievedScopes)))
-		for _, s := range retrievedScopes {
-			// verify each retrieved scope is valid
-			Expect(scopeMap[s]).To(BeTrue())
-			// no duplicate scopes
-			scopeMap[s] = true
-		}
-
-		// remove all the datascopes with odd scope
-		count := 0
-		for i := 0; i < num; i++ {
-			if (i%base)%2 == 1 {
-				count += 1
-				id := strconv.Itoa(i)
-				scopeStr := strconv.Itoa(i % base)
-				scope := &dataDataScope{ID: id, Scope: scopeStr}
-				go func(scope *dataDataScope) {
-					testCache.removeCache(scope)
-					countChan <- 1
-				}(scope)
-			}
-		}
-
-		for i := 0; i < count; i++ {
-			<-countChan
-		}
-
-		// all retrieved scopes should be even
-		retrievedScopes = testCache.readAllScope()
-		for _, s := range retrievedScopes {
-			scopeNum, _ := strconv.Atoi(s)
-			Expect(scopeNum % 2).To(BeZero())
-		}
-
-		// async remove all datascopes
-		for i := 0; i < num; i++ {
-			id := strconv.Itoa(i)
-			scopeStr := strconv.Itoa(i % base)
-			scope := &dataDataScope{ID: id, Scope: scopeStr}
-			go func(scope *dataDataScope) {
-				testCache.removeCache(scope)
-				countChan <- 1
-			}(scope)
-		}
-
-		for i := 0; i < num; i++ {
-			<-countChan
-		}
-		retrievedScopes = testCache.readAllScope()
-		Expect(len(retrievedScopes)).To(Equal(0))
-
-		testCache.closeCache()
-	})
-
-})
diff --git a/init.go b/init.go
index d40a9e7..e1b31c5 100644
--- a/init.go
+++ b/init.go
@@ -29,14 +29,21 @@
 )
 
 var (
-	log               apid.LogService
-	config            apid.ConfigService
-	data              apid.DataService
-	events            apid.EventsService
-	apidInfo          apidInstanceInfo
-	apidPluginDetails string
-	newInstanceID     bool
-	tokenManager      *tokenMan
+	/* All set during plugin initialization */
+	log                       apid.LogService
+	config                    apid.ConfigService
+	dataService               apid.DataService
+	events                    apid.EventsService
+	apidInfo                  apidInstanceInfo
+	newInstanceID             bool
+	tokenManager              *tokenMan
+	changeManager             *pollChangeManager
+	quitPollingSnapshotServer chan bool
+
+	/* Set during post plugin initialization
+	 * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
+	 */
+	apidPluginDetails string = `[{"name":"apidApigeeSync","schemaVer":"1.0"}]`
 )
 
 type apidInstanceInfo struct {
@@ -52,7 +59,7 @@
 	apid.RegisterPlugin(initPlugin)
 }
 
-func initDefaults() {
+func initConfigDefaults() {
 	config.SetDefault(configPollInterval, 120*time.Second)
 	config.SetDefault(configSnapshotProtocol, "json")
 	name, errh := os.Hostname()
@@ -64,57 +71,28 @@
 	log.Debugf("Using %s as display name", config.GetString(configName))
 }
 
-func SetLogger(logger apid.LogService) {
-	log = logger
-}
-
-func initPlugin(services apid.Services) (apid.PluginData, error) {
-	SetLogger(services.Log().ForModule("apigeeSync"))
-	log.Debug("start init")
-
-	config = services.Config()
-	initDefaults()
-
-	data = services.Data()
+func initVariables(services apid.Services) error {
+	dataService = services.Data()
 	events = services.Events()
-
-	scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
-
-	go scopeCache.datascopeCacheManager()
-
-	/* This callback function will get called, once all the plugins are
-	 * initialized (not just this plugin). This is needed because,
-	 * downloadSnapshots/changes etc have to begin to be processed only
-	 * after all the plugins are initialized
-	 */
-	events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
-
-	// check for required values
-	for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
-		configSnapServerBaseURI, configChangeServerBaseURI} {
-		if !config.IsSet(key) {
-			return pluginData, fmt.Errorf("Missing required config value: %s", key)
-		}
-	}
-	proto := config.GetString(configSnapshotProtocol)
-	if proto != "json" && proto != "proto" {
-		return pluginData, fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
-	}
+	//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
+	//also useful for testing
+	quitPollingSnapshotServer = make(chan bool)
+	changeManager = createChangeManager()
 
 	// set up default database
-	db, err := data.DB()
+	db, err := dataService.DB()
 	if err != nil {
-		return pluginData, fmt.Errorf("Unable to access DB: %v", err)
+		return fmt.Errorf("Unable to access DB: %v", err)
 	}
 	err = initDB(db)
 	if err != nil {
-		return pluginData, fmt.Errorf("Unable to access DB: %v", err)
+		return fmt.Errorf("Unable to access DB: %v", err)
 	}
 	setDB(db)
 
 	apidInfo, err = getApidInstanceInfo()
 	if err != nil {
-		return pluginData, fmt.Errorf("Unable to get apid instance info: %v", err)
+		return fmt.Errorf("Unable to get apid instance info: %v", err)
 	}
 
 	if config.IsSet(configApidInstanceID) {
@@ -122,6 +100,64 @@
 	}
 	config.Set(configApidInstanceID, apidInfo.InstanceID)
 
+	return nil
+}
+
+func checkForRequiredValues() error {
+	// check for required values
+	for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
+		configSnapServerBaseURI, configChangeServerBaseURI} {
+		if !config.IsSet(key) {
+			return fmt.Errorf("Missing required config value: %s", key)
+		}
+	}
+	proto := config.GetString(configSnapshotProtocol)
+	if proto != "json" && proto != "proto" {
+		return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
+	}
+
+	return nil
+}
+
+func SetLogger(logger apid.LogService) {
+	log = logger
+}
+
+/* Idempotent state initialization */
+func _initPlugin(services apid.Services) error {
+	SetLogger(services.Log().ForModule("apigeeSync"))
+	log.Debug("start init")
+
+	config = services.Config()
+	err := checkForRequiredValues()
+	if err != nil {
+		return err
+	}
+
+	initConfigDefaults()
+
+	err = initVariables(services)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func initPlugin(services apid.Services) (apid.PluginData, error) {
+
+	err := _initPlugin(services)
+	if err != nil {
+		return pluginData, err
+	}
+
+	/* This callback function will get called once all the plugins are
+	 * initialized (not just this plugin). This is needed because,
+	 * downloadSnapshots/changes etc have to begin to be processed only
+	 * after all the plugins are initialized
+	 */
+	events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
+
 	log.Debug("end init")
 
 	return pluginData, nil
diff --git a/init_test.go b/init_test.go
index 7bcee56..479b4b6 100644
--- a/init_test.go
+++ b/init_test.go
@@ -10,18 +10,20 @@
 	Context("Apid Instance display name", func() {
 
 		It("should be hostname by default", func() {
-			initDefaults()
+			log.Info("Starting init tests...")
+
+			initConfigDefaults()
 			Expect(apidInfo.InstanceName).To(Equal("testhost"))
-		})
+		}, 3)
 
 		It("accept display name from config", func() {
 			config.Set(configName, "aa01")
-			initDefaults()
+			initConfigDefaults()
 			var apidInfoLatest apidInstanceInfo
 			apidInfoLatest, _ = getApidInstanceInfo()
 			Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
 			Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
-		})
+		}, 3)
 
 	})
 
diff --git a/listener.go b/listener.go
index 1fbd82e..6c4b1ef 100644
--- a/listener.go
+++ b/listener.go
@@ -29,22 +29,42 @@
 }
 
 func processSnapshot(snapshot *common.Snapshot) {
-
 	log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
 
-	db, err := data.DBVersion(snapshot.SnapshotInfo)
+	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
 	if err != nil {
 		log.Panicf("Unable to access database: %v", err)
 	}
 
-	err = initDB(db)
+	if config.GetString(configSnapshotProtocol) == "json" {
+		processJsonSnapshot(snapshot, db)
+	} else if config.GetString(configSnapshotProtocol) == "sqlite" {
+		processSqliteSnapshot(snapshot, db)
+	}
+
+	//update apid instance info
+	apidInfo.LastSnapshot = snapshot.SnapshotInfo
+	err = updateApidInstanceInfo()
+	if err != nil {
+		log.Panicf("Unable to update instance info: %v", err)
+	}
+
+	setDB(db)
+	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+
+}
+
+func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) {
+	//nothing to do as of now, here as a placeholder
+}
+
+func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) {
+
+	err := initDB(db)
 	if err != nil {
 		log.Panicf("Unable to initialize database: %v", err)
 	}
 
-	// clear cache
-	scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
-
 	tx, err := db.Begin()
 	if err != nil {
 		log.Panicf("Error starting transaction: %v", err)
@@ -73,10 +93,6 @@
 				if err != nil {
 					log.Panicf("Snapshot update failed: %v", err)
 				}
-				// cache scopes for this cluster
-				if ds.ClusterID == apidInfo.ClusterID {
-					scopeCache.updateCache(&ds)
-				}
 			}
 		}
 	}
@@ -85,15 +101,6 @@
 	if err != nil {
 		log.Panicf("Error committing Snapshot change: %v", err)
 	}
-
-	apidInfo.LastSnapshot = snapshot.SnapshotInfo
-	err = updateApidInstanceInfo()
-	if err != nil {
-		log.Panicf("Unable to update instance info: %v", err)
-	}
-
-	setDB(db)
-	log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
 }
 
 func processChangeList(changes *common.ChangeList) {
@@ -121,19 +128,9 @@
 			case common.Insert:
 				ds := makeDataScopeFromRow(change.NewRow)
 				err = insertDataScope(ds, tx)
-
-				// cache scopes for this cluster
-				if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
-					scopeCache.updateCache(&ds)
-				}
 			case common.Delete:
 				ds := makeDataScopeFromRow(change.OldRow)
 				err = deleteDataScope(ds, tx)
-
-				// cache scopes for this cluster
-				if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
-					scopeCache.removeCache(&ds)
-				}
 			default:
 				// common.Update is not allowed
 				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/listener_test.go b/listener_test.go
index 2b060de..ee0b4cc 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,6 +15,7 @@
 	Context("ApigeeSync snapshot event", func() {
 
 		It("should set DB to appropriate version", func() {
+			log.Info("Starting listener tests...")
 
 			//save the last snapshot, so we can restore it at the end of this context
 			saveLastSnapshot = apidInfo.LastSnapshot
@@ -28,7 +29,7 @@
 
 			Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo))
 
-			expectedDB, err := data.DBVersion(event.SnapshotInfo)
+			expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
 			Expect(err).NotTo(HaveOccurred())
 
 			Expect(getDB() == expectedDB).Should(BeTrue())
@@ -47,7 +48,7 @@
 			}
 
 			Expect(func() { handler.Handle(&event) }).To(Panic())
-		})
+		}, 3)
 
 		It("should process a valid Snapshot", func() {
 
@@ -203,7 +204,7 @@
 
 			//restore the last snapshot
 			apidInfo.LastSnapshot = saveLastSnapshot
-		})
+		}, 3)
 	})
 
 	Context("ApigeeSync change event", func() {
@@ -225,7 +226,7 @@
 				}
 
 				Expect(func() { handler.Handle(&event) }).To(Panic())
-			})
+			}, 3)
 
 			It("update event should panic", func() {
 
@@ -242,7 +243,7 @@
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
 				apidInfo.LastSnapshot = saveLastSnapshot
-			})
+			}, 3)
 
 			PIt("delete event should kill all the things!")
 		})
@@ -328,7 +329,7 @@
 				Expect(len(scopes)).To(Equal(2))
 				Expect(scopes[0]).To(Equal("s1"))
 				Expect(scopes[1]).To(Equal("s2"))
-			})
+			}, 3)
 
 			It("delete event should delete", func() {
 				insert := common.ChangeList{
@@ -372,7 +373,7 @@
 				Expect(err).NotTo(HaveOccurred())
 
 				Expect(nRows).To(Equal(0))
-			})
+			}, 3)
 
 			It("update event should panic", func() {
 
@@ -389,7 +390,7 @@
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
 				apidInfo.LastSnapshot = saveLastSnapshot
-			})
+			}, 3)
 
 		})
 
diff --git a/snapshot.go b/snapshot.go
new file mode 100644
index 0000000..ae667bf
--- /dev/null
+++ b/snapshot.go
@@ -0,0 +1,257 @@
+package apidApigeeSync
+
+import (
+	"encoding/json"
+	"github.com/30x/apid-core"
+	"github.com/30x/apid-core/data"
+	"github.com/apigee-labs/transicator/common"
+	"net/http"
+	"os"
+
+	"io"
+	"io/ioutil"
+	"net/url"
+	"path"
+	"time"
+)
+
+// retrieve boot information: apid_config and apid_config_scope
+func downloadBootSnapshot(quitPolling chan bool) {
+	log.Debug("download Snapshot for boot data")
+
+	scopes := []string{apidInfo.ClusterID}
+	snapshot := &common.Snapshot{}
+	downloadSnapshot(scopes, snapshot, quitPolling)
+	storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot *common.Snapshot) {
+	processSnapshot(snapshot)
+}
+
+// use the scope IDs from the boot snapshot to get all the data associated with the scopes
+func downloadDataSnapshot(quitPolling chan bool) {
+	log.Debug("download Snapshot for data scopes")
+
+	scopes := findScopesForId(apidInfo.ClusterID)
+	scopes = append(scopes, apidInfo.ClusterID)
+	snapshot := &common.Snapshot{}
+	downloadSnapshot(scopes, snapshot, quitPolling)
+	storeDataSnapshot(snapshot)
+}
+
+func storeDataSnapshot(snapshot *common.Snapshot) {
+	knownTables = extractTablesFromSnapshot(snapshot)
+
+	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+	if err != nil {
+		log.Panicf("Database inaccessible: %v", err)
+	}
+	persistKnownTablesToDB(knownTables, db)
+
+	log.Info("Emitting Snapshot to plugins")
+
+	select {
+	case <-time.After(pluginTimeout):
+		log.Panic("Timeout. Plugins failed to respond to snapshot.")
+	case <-events.Emit(ApigeeSyncEventSelector, snapshot):
+	}
+
+}
+
+func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
+
+	tables = make(map[string]bool)
+
+	log.Debug("Extracting table names from snapshot")
+	if snapshot.Tables == nil {
+		//if this panic ever fires, it's a bug
+		log.Panicf("Attempt to extract known tables from snapshot without tables failed")
+	}
+
+	for _, table := range snapshot.Tables {
+		tables[table.Name] = true
+	}
+
+	return tables
+}
+
+func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
+
+	tables = make(map[string]bool)
+
+	log.Debug("Extracting table names from existing DB")
+	rows, err := db.Query("SELECT name FROM _known_tables;")
+	defer rows.Close()
+
+	if err != nil {
+		log.Panicf("Error reading current set of tables: %v", err)
+	}
+
+	for rows.Next() {
+		var table string
+		if err := rows.Scan(&table); err != nil {
+			log.Panicf("Error reading current set of tables: %v", err)
+		}
+		log.Debugf("Table %s found in existing db", table)
+
+		tables[table] = true
+	}
+	return tables
+}
+
+// Skip Downloading snapshot if there is already a snapshot available from previous run
+func startOnLocalSnapshot(snapshot string) *common.Snapshot {
+	log.Infof("Starting on local snapshot: %s", snapshot)
+
+	// ensure DB version will be accessible on behalf of dependant plugins
+	db, err := dataService.DBVersion(snapshot)
+	if err != nil {
+		log.Panicf("Database inaccessible: %v", err)
+	}
+
+	knownTables = extractTablesFromDB(db)
+
+	// allow plugins (including this one) to start immediately on existing database
+	// Note: this MUST have no tables as that is used as an indicator
+	return &common.Snapshot{
+		SnapshotInfo: snapshot,
+	}
+}
+
+// will keep retrying with backoff until success
+func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) {
+
+	log.Debug("downloadSnapshot")
+
+	snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
+	if err != nil {
+		log.Panicf("bad url value for config %s: %s", snapshotUri, err)
+	}
+
+	snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")
+
+	v := url.Values{}
+	for _, scope := range scopes {
+		v.Add("scope", scope)
+	}
+	snapshotUri.RawQuery = v.Encode()
+	uri := snapshotUri.String()
+	log.Infof("Snapshot Download: %s", uri)
+
+	client := &http.Client{
+		CheckRedirect: Redirect,
+		Timeout:       httpTimeout,
+	}
+
+	//pollWithBackoff only accepts function that accept a single quit channel
+	//to accomadate functions which need more parameters, wrap them in closures
+	attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
+
+	pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError)
+}
+
+func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {
+	return func(_ chan bool) error {
+		req, err := http.NewRequest("GET", uri, nil)
+		if err != nil {
+			// should never happen, but if it does, it's unrecoverable anyway
+			log.Panicf("Snapshotserver comm error: %v", err)
+		}
+		addHeaders(req)
+
+		var processSnapshotResponse func(*http.Response, *common.Snapshot) error
+
+		// Set the transport protocol type based on conf file input
+		if config.GetString(configSnapshotProtocol) == "json" {
+			req.Header.Set("Accept", "application/json")
+			processSnapshotResponse = processSnapshotServerJsonResponse
+		} else if config.GetString(configSnapshotProtocol) == "sqlite" {
+			req.Header.Set("Accept", "application/transicator+sqlite")
+			processSnapshotResponse = processSnapshotServerFileResponse
+		}
+
+		// Issue the request to the snapshot server
+		r, err := client.Do(req)
+		if err != nil {
+			log.Errorf("Snapshotserver comm error: %v", err)
+			return err
+		}
+
+		defer r.Body.Close()
+
+		if r.StatusCode != 200 {
+			body, _ := ioutil.ReadAll(r.Body)
+			log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
+			return expected200Error{}
+		}
+
+		// Decode the Snapshot server response
+		err = processSnapshotResponse(r, snapshot)
+		if err != nil {
+			log.Errorf("Response Data not parsable: %v", err)
+			return err
+		}
+
+		return nil
+	}
+}
+
+func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
+	log.Debugf("Inserting table names found in snapshot into db")
+
+	tx, err := db.Begin()
+	if err != nil {
+		log.Panicf("Error starting transaction: %v", err)
+	}
+	defer tx.Rollback()
+
+	_, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
+	if err != nil {
+		log.Panicf("Could not create _known_tables table: %s", err)
+	}
+
+	for name := range tables {
+		log.Debugf("Inserting %s into _known_tables", name)
+		_, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
+		if err != nil {
+			log.Panicf("Error encountered inserting into known tables ", err)
+		}
+
+	}
+
+	err = tx.Commit()
+	if err != nil {
+		log.Panicf("Error committing transaction: %v", err)
+
+	}
+}
+
+func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error {
+	return json.NewDecoder(r.Body).Decode(snapshot)
+}
+
+func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error {
+	dbId := r.Header.Get("Transicator-Snapshot-TXID")
+	out, err := os.Create(data.DBPath(dbId))
+	if err != nil {
+		return err
+	}
+	defer out.Close()
+
+	//stream respose to DB
+	_, err = io.Copy(out, r.Body)
+
+	if err != nil {
+		return err
+	}
+
+	snapshot.SnapshotInfo = dbId
+	//TODO get timestamp from transicator.  Not currently in response
+
+	return nil
+}
+
+func handleSnapshotServerError(err error) {
+	log.Debugf("Error connecting to snapshot server: %v", err)
+}
diff --git a/token.go b/token.go
index 60097c1..a6c118e 100644
--- a/token.go
+++ b/token.go
@@ -7,13 +7,12 @@
 	"net/http"
 	"net/url"
 	"path"
-	"sync"
+	"sync/atomic"
 	"time"
 )
 
 var (
 	refreshFloatTime = time.Minute
-	getTokenLock     sync.Mutex
 )
 
 /*
@@ -26,15 +25,34 @@
 */
 
 func createTokenManager() *tokenMan {
-	t := &tokenMan{}
-	t.doRefresh = make(chan bool, 1)
-	t.maintainToken()
+	isClosedInt := int32(0)
+
+	t := &tokenMan{
+		quitPollingForToken: make(chan bool, 1),
+		closed:              make(chan bool),
+		getTokenChan:        make(chan bool),
+		invalidateTokenChan: make(chan bool),
+		returnTokenChan:     make(chan *oauthToken),
+		invalidateDone:      make(chan bool),
+		isClosed:            &isClosedInt,
+	}
+
+	t.retrieveNewToken()
+	t.refreshTimer = time.After(t.token.refreshIn())
+	go t.maintainToken()
 	return t
 }
 
 type tokenMan struct {
-	token     *oauthToken
-	doRefresh chan bool
+	token               *oauthToken
+	isClosed            *int32
+	quitPollingForToken chan bool
+	closed              chan bool
+	getTokenChan        chan bool
+	invalidateTokenChan chan bool
+	refreshTimer        <-chan time.Time
+	returnTokenChan     chan *oauthToken
+	invalidateDone      chan bool
 }
 
 func (t *tokenMan) getBearerToken() string {
@@ -42,50 +60,63 @@
 }
 
 func (t *tokenMan) maintainToken() {
-	go func() {
-		for {
-			if t.token.needsRefresh() {
-				getTokenLock.Lock()
-				t.retrieveNewToken()
-				getTokenLock.Unlock()
-			}
-			select {
-			case _, ok := <-t.doRefresh:
-				if !ok {
-					log.Debug("closed tokenMan")
-					return
-				}
-				log.Debug("force token refresh")
-			case <-time.After(t.token.refreshIn()):
-				log.Debug("auto refresh token")
-			}
+	for {
+		select {
+		case <-t.closed:
+			return
+		case <-t.refreshTimer:
+			log.Debug("auto refresh token")
+			t.retrieveNewToken()
+			t.refreshTimer = time.After(t.token.refreshIn())
+		case <-t.getTokenChan:
+			token := t.token
+			t.returnTokenChan <- token
+		case <-t.invalidateTokenChan:
+			t.retrieveNewToken()
+			t.refreshTimer = time.After(t.token.refreshIn())
+			t.invalidateDone <- true
 		}
-	}()
-}
-
-func (t *tokenMan) invalidateToken() {
-	log.Debug("invalidating token")
-	t.token = nil
-	t.doRefresh <- true
+	}
 }
 
 // will block until valid
-func (t *tokenMan) getToken() *oauthToken {
-	getTokenLock.Lock()
-	defer getTokenLock.Unlock()
-
-	if t.token.isValid() {
-		log.Debugf("returning existing token: %v", t.token)
-		return t.token
+func (t *tokenMan) invalidateToken() {
+	//has been closed
+	if atomic.LoadInt32(t.isClosed) == int32(1) {
+		log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
+		return
 	}
-
-	t.retrieveNewToken()
-	return t.token
+	log.Debug("invalidating token")
+	t.invalidateTokenChan <- true
+	<-t.invalidateDone
 }
 
+func (t *tokenMan) getToken() *oauthToken {
+	//has been closed
+	if atomic.LoadInt32(t.isClosed) == int32(1) {
+		log.Debug("TokenManager: getToken() called on closed tokenManager")
+		return nil
+	}
+	t.getTokenChan <- true
+	return <-t.returnTokenChan
+}
+
+/*
+ * blocking close() of tokenMan
+ */
+
 func (t *tokenMan) close() {
+	//has been closed
+	if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
+		log.Panic("TokenManager: close() has been called before!")
+		return
+	}
 	log.Debug("close token manager")
-	close(t.doRefresh)
+	t.quitPollingForToken <- true
+	// sending instead of closing, to make sure it enters the t.doRefresh branch
+	t.closed <- true
+	close(t.closed)
+	log.Debug("token manager closed")
 }
 
 // don't call externally. will block until success.
@@ -99,18 +130,11 @@
 	}
 	uri.Path = path.Join(uri.Path, "/accesstoken")
 
-	retryIn := 5 * time.Millisecond
-	maxBackOff := maxBackoffTimeout
-	backOffFunc := createBackOff(retryIn, maxBackOff)
-	first := true
+	pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
+}
 
-	for {
-		if first {
-			first = false
-		} else {
-			backOffFunc()
-		}
-
+func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+	return func(_ chan bool) error {
 		form := url.Values{}
 		form.Set("grant_type", "client_credentials")
 		form.Add("client_id", config.GetString(configConsumerKey))
@@ -133,26 +157,26 @@
 		resp, err := client.Do(req)
 		if err != nil {
 			log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
-			continue
+			return err
 		}
 
 		body, err := ioutil.ReadAll(resp.Body)
 		resp.Body.Close()
 		if err != nil {
 			log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
-			continue
+			return err
 		}
 
 		if resp.StatusCode != 200 {
 			log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
-			continue
+			return expected200Error{}
 		}
 
 		var token oauthToken
 		err = json.Unmarshal(body, &token)
 		if err != nil {
 			log.Errorf("unable to unmarshal JSON response '%s': %v", string(body), err)
-			continue
+			return err
 		}
 
 		if token.ExpiresIn > 0 {
@@ -166,12 +190,17 @@
 
 		if newInstanceID {
 			newInstanceID = false
-			updateApidInstanceInfo()
-		}
+			err = updateApidInstanceInfo()
+			if err != nil {
+				log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err)
+				return err
 
+			}
+		}
 		t.token = &token
 		config.Set(configBearerToken, token.AccessToken)
-		return
+
+		return nil
 	}
 }
 
diff --git a/token_test.go b/token_test.go
index 5deec1c..045b318 100644
--- a/token_test.go
+++ b/token_test.go
@@ -1,5 +1,8 @@
 package apidApigeeSync
 
+/*
+ * Unit test of token manager
+ */
 import (
 	"time"
 
@@ -17,6 +20,8 @@
 	Context("oauthToken", func() {
 
 		It("should calculate valid token", func() {
+			log.Info("Starting token tests...")
+
 			t := &oauthToken{
 				AccessToken: "x",
 				ExpiresIn:   120000,
@@ -25,9 +30,10 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically(">", 0))
 			Expect(t.needsRefresh()).To(BeFalse())
 			Expect(t.isValid()).To(BeTrue())
-		})
+		}, 3)
 
 		It("should calculate expired token", func() {
+
 			t := &oauthToken{
 				AccessToken: "x",
 				ExpiresIn:   0,
@@ -36,9 +42,10 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeFalse())
-		})
+		}, 3)
 
 		It("should calculate token needing refresh", func() {
+
 			t := &oauthToken{
 				AccessToken: "x",
 				ExpiresIn:   59000,
@@ -47,44 +54,76 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeTrue())
-		})
+		}, 3)
 
 		It("should calculate on empty token", func() {
+
 			t := &oauthToken{}
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<=", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeFalse())
-		})
+		}, 3)
 	})
 
 	Context("tokenMan", func() {
 
 		It("should get a valid token", func() {
-			token := tokenManager.getToken()
+			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				defer GinkgoRecover()
+
+				res := oauthToken{
+					AccessToken: "ABCD",
+					ExpiresIn:   1000,
+				}
+				body, err := json.Marshal(res)
+				Expect(err).NotTo(HaveOccurred())
+				w.Write(body)
+			}))
+			config.Set(configProxyServerBaseURI, ts.URL)
+			testedTokenManager := createTokenManager()
+			token := testedTokenManager.getToken()
 
 			Expect(token.AccessToken).ToNot(BeEmpty())
 			Expect(token.ExpiresIn > 0).To(BeTrue())
 			Expect(token.ExpiresAt).To(BeTemporally(">", time.Now()))
 
-			bToken := tokenManager.getBearerToken()
+			bToken := testedTokenManager.getBearerToken()
 			Expect(bToken).To(Equal(token.AccessToken))
-		})
+			testedTokenManager.close()
+			ts.Close()
+		}, 3)
 
 		It("should refresh when forced to", func() {
-			token := tokenManager.getToken()
+
+			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				defer GinkgoRecover()
+
+				res := oauthToken{
+					AccessToken: generateUUID(),
+					ExpiresIn:   1000,
+				}
+				body, err := json.Marshal(res)
+				Expect(err).NotTo(HaveOccurred())
+				w.Write(body)
+			}))
+			config.Set(configProxyServerBaseURI, ts.URL)
+
+			testedTokenManager := createTokenManager()
+			token := testedTokenManager.getToken()
 			Expect(token.AccessToken).ToNot(BeEmpty())
 
-			tokenManager.invalidateToken()
+			testedTokenManager.invalidateToken()
 
-			token2 := tokenManager.getToken()
+			token2 := testedTokenManager.getToken()
 			Expect(token).ToNot(Equal(token2))
 			Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
-		})
+			testedTokenManager.close()
+			ts.Close()
+		}, 3)
 
 		It("should refresh in refresh interval", func(done Done) {
 
-			finished := make(chan bool)
-			var tm *tokenMan
+			finished := make(chan bool, 1)
 			start := time.Now()
 			count := 0
 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -106,33 +145,27 @@
 				Expect(err).NotTo(HaveOccurred())
 				w.Write(body)
 			}))
-			defer ts.Close()
 
-			tokenManager.getToken()
-			tokenManager.close()
-			oldBase := config.Get(configProxyServerBaseURI)
 			config.Set(configProxyServerBaseURI, ts.URL)
-			oldFloat := refreshFloatTime
-			refreshFloatTime = 950 * time.Millisecond
-			defer func() {
-				tm.close()
-				config.Set(configProxyServerBaseURI, oldBase)
-				tokenManager = createTokenManager()
-				refreshFloatTime = oldFloat
-			}()
+			testedTokenManager := createTokenManager()
 
-			tm = createTokenManager()
+			testedTokenManager.getToken()
+
 			<-finished
+
+			testedTokenManager.close()
+			ts.Close()
+
 			close(done)
-		})
+		}, 3)
 
 		It("should have created_at_apid first time, update_at_apid after", func(done Done) {
-
-			finished := make(chan bool)
-			var tm *tokenMan
+			finished := make(chan bool, 1)
 			count := 0
+
+			newInstanceID = true
+
 			ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-				defer GinkgoRecover()
 
 				count++
 				if count == 1 {
@@ -147,29 +180,23 @@
 				}
 				res := oauthToken{
 					AccessToken: string(count),
-					ExpiresIn:   2000,
+					ExpiresIn:   200000,
 				}
 				body, err := json.Marshal(res)
 				Expect(err).NotTo(HaveOccurred())
 				w.Write(body)
 			}))
-			defer ts.Close()
 
-			tokenManager.getToken()
-			tokenManager.close()
-			oldBase := config.Get(configProxyServerBaseURI)
 			config.Set(configProxyServerBaseURI, ts.URL)
-			defer func() {
-				tm.close()
-				config.Set(configProxyServerBaseURI, oldBase)
-				tokenManager = createTokenManager()
-			}()
+			testedTokenManager := createTokenManager()
 
-			newInstanceID = true
-			tm = createTokenManager()
-			tm.invalidateToken()
+			testedTokenManager.getToken()
+			testedTokenManager.invalidateToken()
+			testedTokenManager.getToken()
 			<-finished
+			testedTokenManager.close()
+			ts.Close()
 			close(done)
-		})
+		}, 3)
 	})
 })