Merge branch 'master' into refactor-haoming
diff --git a/apigee_sync.go b/apigee_sync.go
index 8e7cafd..5c77056 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -46,7 +46,7 @@
  */
 func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
 
-	backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2)
+	backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
 
 	//inintialize the retry channel to start first attempt immediately
 	retry := time.After(0 * time.Millisecond)
@@ -93,7 +93,7 @@
 }
 
 func addHeaders(req *http.Request) {
-	req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
+	req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken())
 	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -119,4 +119,4 @@
 
 func (a changeServerError) Error() string {
 	return a.Code
-}
+}
\ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 579a5c3..9cdec11 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -19,7 +19,7 @@
 
 			// set up mock server
 			mockParms := MockParms{
-				ReliableAPI:  true,
+				ReliableAPI:  false,
 				ClusterID:    config.GetString(configApidClusterId),
 				TokenKey:     config.GetString(configConsumerKey),
 				TokenSecret:  config.GetString(configConsumerSecret),
@@ -237,7 +237,7 @@
 			Expect(changesHaveNewTables(nil,
 				[]common.Change{common.Change{Table: "a"}},
 			)).To(BeTrue())
-		})
+		}, 3)
 
 		// todo: disabled for now -
 		// there is precondition I haven't been able to track down that breaks this test on occasion
@@ -253,5 +253,32 @@
 			})
 			testMock.forceNewSnapshot()
 		})
+
+
+		It("Verify the Sequence Number Logic works as expected", func() {
+			Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+			Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+			Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+			Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+			Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+			Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+		}, 3)
+
+		/*
+		 * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+		 */
+		It("Should be able to handle duplicate snapshot during bootstrap", func() {
+			initializeContext()
+
+			tokenManager = createTokenManager()
+			events.Listen(ApigeeSyncEventSelector, &handler{})
+
+			scopes := []string{apidInfo.ClusterID}
+			snapshot := &common.Snapshot{}
+			downloadSnapshot(scopes, snapshot, nil)
+			storeBootSnapshot(snapshot)
+			storeDataSnapshot(snapshot)
+			restoreContext()
+		}, 3)
 	})
 })
diff --git a/backoff.go b/backoff.go
index 291a037..e3a7403 100644
--- a/backoff.go
+++ b/backoff.go
@@ -2,8 +2,8 @@
 
 import (
 	"math"
-	"math/rand"
 	"time"
+	"math/rand"
 )
 
 const defaultInitial time.Duration = 200 * time.Millisecond
@@ -13,6 +13,7 @@
 type Backoff struct {
 	attempt         int
 	initial, max    time.Duration
+	jitter bool
 	backoffStrategy func() time.Duration
 }
 
@@ -21,7 +22,7 @@
 	factor float64
 }
 
-func NewExponentialBackoff(initial, max time.Duration, factor float64) *Backoff {
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
 	backoff := &ExponentialBackoff{}
 
 	if initial <= 0 {
@@ -39,9 +40,10 @@
 	backoff.max = max
 	backoff.attempt = 0
 	backoff.factor = factor
+	backoff.jitter = jitter
 	backoff.backoffStrategy = backoff.exponentialBackoffStrategy
 
-	return &backoff.Backoff
+	return backoff
 }
 
 func (b *Backoff) Duration() time.Duration {
@@ -56,14 +58,15 @@
 	attempt := float64(b.Backoff.attempt)
 	duration := initial * math.Pow(b.factor, attempt)
 
-	//introduce some jitter
-	duration = (rand.Float64()*(duration-initial) + initial)
-
 	if duration > math.MaxInt64 {
 		return b.max
 	}
 	dur := time.Duration(duration)
 
+	if b.jitter {
+		duration = (rand.Float64()*(duration-initial) + initial)
+	}
+
 	if dur > b.max {
 		return b.max
 	}
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..ae85909
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,56 @@
+package apidApigeeSync
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"time"
+)
+
+var _ = Describe("backoff", func() {
+
+	Context("Backoff timeout calculations", func() {
+
+		It("Should properly apply defaults", func() {
+			log.Info("Starting backoff tests...")
+			b := NewExponentialBackoff(0, 0, 0, true)
+			Expect(defaultInitial).To(Equal(b.initial))
+			Expect(defaultMax).To(Equal(b.max))
+			Expect(defaultFactor).To(Equal(b.factor))
+
+			b = NewExponentialBackoff(-1, -1, -1, true)
+			Expect(defaultInitial).To(Equal(b.initial))
+			Expect(defaultMax).To(Equal(b.max))
+			Expect(defaultFactor).To(Equal(b.factor))
+		})
+
+		It("should properly apply exponential backoff strategy", func() {
+			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+			Expect(1600 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(4).To(Equal(b.Attempt()))
+		})
+
+		It("should reset properly", func() {
+			b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+			b.Reset()
+			Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(1).To(Equal(b.Attempt()))
+			Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(2).To(Equal(b.Attempt()))
+			Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+			Expect(3).To(Equal(b.Attempt()))
+		})
+	})
+
+})
diff --git a/changes.go b/changes.go
index d85eb96..0ac39b2 100644
--- a/changes.go
+++ b/changes.go
@@ -146,9 +146,9 @@
 	v.Add("block", block)
 
 	/*
-	* Include all the scopes associated with the config Id
-	* The Config Id is included as well, as it acts as the
-	* Bootstrap scope
+	 * Include all the scopes associated with the config Id
+	 * The Config Id is included as well, as it acts as the
+	 * Bootstrap scope
 	 */
 	for _, scope := range scopes {
 		v.Add("scope", scope)
@@ -159,6 +159,7 @@
 	uri := changesUri.String()
 	log.Debugf("Fetching changes: %s", uri)
 
+	/* If error, break the loop, and retry after interval */
 	client := &http.Client{Timeout: httpTimeout} // must be greater than block value
 	req, err := http.NewRequest("GET", uri, nil)
 	addHeaders(req)
@@ -219,6 +220,17 @@
 		return err
 	}
 
+	/*
+	 * If the lastSequence is already newer or the same than what we got via
+	 * resp.LastSequence, Ignore it.
+	 */
+	if lastSequence != "" &&
+		getChangeStatus(lastSequence, resp.LastSequence) != 1 {
+		return changeServerError{
+			Code: "Ignore change, already have newer changes",
+		}
+	}
+
 	if changesRequireDDLSync(resp) {
 		return changeServerError{
 			Code: "DDL changes detected; must get new snapshot",
@@ -236,13 +248,8 @@
 		log.Debugf("No Changes detected for Scopes: %s", scopes)
 	}
 
-	if lastSequence != resp.LastSequence {
-		lastSequence = resp.LastSequence
-		err := updateLastSequence(resp.LastSequence)
-		if err != nil {
-			log.Panic("Unable to update Sequence in DB")
-		}
-	}
+	updateSequence(resp.LastSequence)
+
 	return nil
 }
 
@@ -283,3 +290,28 @@
 
 	return false
 }
+
+/*
+ * seqCurr.Compare() will return 1, if its newer than seqPrev,
+ * else will return 0, if same, or -1 if older.
+ */
+func getChangeStatus(lastSeq string, currSeq string) int {
+	seqPrev, err := common.ParseSequence(lastSeq)
+	if err != nil {
+		log.Panic("Unable to parse previous sequence string")
+	}
+	seqCurr, err := common.ParseSequence(currSeq)
+	if err != nil {
+		log.Panic("Unable to parse current sequence string")
+	}
+	return seqCurr.Compare(seqPrev)
+}
+
+func updateSequence(seq string) {
+	lastSequence = seq
+	err := updateLastSequence(seq)
+	if err != nil {
+		log.Panic("Unable to update Sequence in DB")
+	}
+
+}
diff --git a/data.go b/data.go
index 4e5a62c..48f1ff7 100644
--- a/data.go
+++ b/data.go
@@ -88,8 +88,9 @@
 
 	log.Debugf("inserting into APID_CLUSTER: %v", dac)
 
+	//replace to accomodate same snapshot txid
 	stmt, err := txn.Prepare(`
-	INSERT INTO APID_CLUSTER
+	REPLACE INTO APID_CLUSTER
 		(id, description, name, umbrella_org_app_name,
 		created, created_by, updated, updated_by,
 		last_sequence)
@@ -117,8 +118,9 @@
 
 	log.Debugf("insert DATA_SCOPE: %v", ds)
 
+	//replace to accomodate same snapshot txid
 	stmt, err := txn.Prepare(`
-	INSERT INTO DATA_SCOPE
+	REPLACE INTO DATA_SCOPE
 		(id, apid_cluster_id, scope, org,
 		env, created, created_by, updated,
 		updated_by)
diff --git a/init_test.go b/init_test.go
index 6135875..479b4b6 100644
--- a/init_test.go
+++ b/init_test.go
@@ -14,7 +14,7 @@
 
 			initConfigDefaults()
 			Expect(apidInfo.InstanceName).To(Equal("testhost"))
-		})
+		}, 3)
 
 		It("accept display name from config", func() {
 			config.Set(configName, "aa01")
@@ -23,7 +23,7 @@
 			apidInfoLatest, _ = getApidInstanceInfo()
 			Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
 			Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
-		})
+		}, 3)
 
 	})
 
diff --git a/listener.go b/listener.go
index ba6c02c..6c4b1ef 100644
--- a/listener.go
+++ b/listener.go
@@ -130,7 +130,7 @@
 				err = insertDataScope(ds, tx)
 			case common.Delete:
 				ds := makeDataScopeFromRow(change.OldRow)
-				deleteDataScope(ds, tx)
+				err = deleteDataScope(ds, tx)
 			default:
 				// common.Update is not allowed
 				log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/listener_test.go b/listener_test.go
index e426d39..ee0b4cc 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -48,7 +48,7 @@
 			}
 
 			Expect(func() { handler.Handle(&event) }).To(Panic())
-		})
+		}, 3)
 
 		It("should process a valid Snapshot", func() {
 
@@ -204,7 +204,7 @@
 
 			//restore the last snapshot
 			apidInfo.LastSnapshot = saveLastSnapshot
-		})
+		}, 3)
 	})
 
 	Context("ApigeeSync change event", func() {
@@ -226,7 +226,7 @@
 				}
 
 				Expect(func() { handler.Handle(&event) }).To(Panic())
-			})
+			}, 3)
 
 			It("update event should panic", func() {
 
@@ -243,7 +243,7 @@
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
 				apidInfo.LastSnapshot = saveLastSnapshot
-			})
+			}, 3)
 
 			PIt("delete event should kill all the things!")
 		})
@@ -329,7 +329,7 @@
 				Expect(len(scopes)).To(Equal(2))
 				Expect(scopes[0]).To(Equal("s1"))
 				Expect(scopes[1]).To(Equal("s2"))
-			})
+			}, 3)
 
 			It("delete event should delete", func() {
 				insert := common.ChangeList{
@@ -373,7 +373,7 @@
 				Expect(err).NotTo(HaveOccurred())
 
 				Expect(nRows).To(Equal(0))
-			})
+			}, 3)
 
 			It("update event should panic", func() {
 
@@ -390,7 +390,7 @@
 				Expect(func() { handler.Handle(&event) }).To(Panic())
 				//restore the last snapshot
 				apidInfo.LastSnapshot = saveLastSnapshot
-			})
+			}, 3)
 
 		})
 
diff --git a/snapshot.go b/snapshot.go
index 055d896..0158871 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -81,6 +81,7 @@
 
 	scopes := []string{apidInfo.ClusterID}
 	snapshot := &common.Snapshot{}
+  
 	err := s.downloadSnapshot(scopes, snapshot)
 	if err != nil {
 		// this may happen during shutdown
@@ -97,6 +98,10 @@
 	}
 
 	// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
+  storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot *common.Snapshot) {
 	processSnapshot(snapshot)
 }
 
@@ -116,7 +121,7 @@
 
 	log.Debug("download Snapshot for data scopes")
 
-	var scopes = findScopesForId(apidInfo.ClusterID)
+	scopes := findScopesForId(apidInfo.ClusterID)
 	scopes = append(scopes, apidInfo.ClusterID)
 	snapshot := &common.Snapshot{}
 	err := s.downloadSnapshot(scopes, snapshot)
@@ -127,7 +132,10 @@
 		}
 		return
 	}
+	storeDataSnapshot(snapshot)
+}
 
+func storeDataSnapshot(snapshot *common.Snapshot) {
 	knownTables = extractTablesFromSnapshot(snapshot)
 
 	db, err := dataService.DBVersion(snapshot.SnapshotInfo)
@@ -150,8 +158,8 @@
 	case <-events.Emit(ApigeeSyncEventSelector, snapshot):
 		// the new snapshot has been processed
 		// if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
-		// In this case, will close now
 	}
+
 }
 
 func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
@@ -216,6 +224,7 @@
 
 // a blocking method
 // will keep retrying with backoff until success
+
 func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
 	// if closed
 	if atomic.LoadInt32(s.isClosed) == int32(1) {
@@ -248,10 +257,8 @@
 	//pollWithBackoff only accepts function that accept a single quit channel
 	//to accomadate functions which need more parameters, wrap them in closures
 	attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
-
 	pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
 	return nil
-
 }
 
 func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {
diff --git a/token_test.go b/token_test.go
index 49d1eb6..045b318 100644
--- a/token_test.go
+++ b/token_test.go
@@ -30,7 +30,7 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically(">", 0))
 			Expect(t.needsRefresh()).To(BeFalse())
 			Expect(t.isValid()).To(BeTrue())
-		})
+		}, 3)
 
 		It("should calculate expired token", func() {
 
@@ -42,7 +42,7 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeFalse())
-		})
+		}, 3)
 
 		It("should calculate token needing refresh", func() {
 
@@ -54,7 +54,7 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeTrue())
-		})
+		}, 3)
 
 		It("should calculate on empty token", func() {
 
@@ -62,7 +62,7 @@
 			Expect(t.refreshIn().Seconds()).To(BeNumerically("<=", 0))
 			Expect(t.needsRefresh()).To(BeTrue())
 			Expect(t.isValid()).To(BeFalse())
-		})
+		}, 3)
 	})
 
 	Context("tokenMan", func() {
@@ -91,7 +91,7 @@
 			Expect(bToken).To(Equal(token.AccessToken))
 			testedTokenManager.close()
 			ts.Close()
-		})
+		}, 3)
 
 		It("should refresh when forced to", func() {
 
@@ -119,7 +119,7 @@
 			Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
 			testedTokenManager.close()
 			ts.Close()
-		})
+		}, 3)
 
 		It("should refresh in refresh interval", func(done Done) {
 
@@ -157,7 +157,7 @@
 			ts.Close()
 
 			close(done)
-		})
+		}, 3)
 
 		It("should have created_at_apid first time, update_at_apid after", func(done Done) {
 			finished := make(chan bool, 1)
@@ -197,6 +197,6 @@
 			testedTokenManager.close()
 			ts.Close()
 			close(done)
-		})
+		}, 3)
 	})
 })