Merge branch 'master' into refactor-haoming
diff --git a/apigee_sync.go b/apigee_sync.go
index 8e7cafd..5c77056 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -46,7 +46,7 @@
*/
func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
- backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2)
+ backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
//inintialize the retry channel to start first attempt immediately
retry := time.After(0 * time.Millisecond)
@@ -93,7 +93,7 @@
}
func addHeaders(req *http.Request) {
- req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken())
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -119,4 +119,4 @@
func (a changeServerError) Error() string {
return a.Code
-}
+}
\ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index 579a5c3..9cdec11 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -19,7 +19,7 @@
// set up mock server
mockParms := MockParms{
- ReliableAPI: true,
+ ReliableAPI: false,
ClusterID: config.GetString(configApidClusterId),
TokenKey: config.GetString(configConsumerKey),
TokenSecret: config.GetString(configConsumerSecret),
@@ -237,7 +237,7 @@
Expect(changesHaveNewTables(nil,
[]common.Change{common.Change{Table: "a"}},
)).To(BeTrue())
- })
+ }, 3)
// todo: disabled for now -
// there is precondition I haven't been able to track down that breaks this test on occasion
@@ -253,5 +253,32 @@
})
testMock.forceNewSnapshot()
})
+
+
+ It("Verify the Sequence Number Logic works as expected", func() {
+ Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+ Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+ Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+ Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+ Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+ Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+ }, 3)
+
+ /*
+ * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+ */
+ It("Should be able to handle duplicate snapshot during bootstrap", func() {
+ initializeContext()
+
+ tokenManager = createTokenManager()
+ events.Listen(ApigeeSyncEventSelector, &handler{})
+
+ scopes := []string{apidInfo.ClusterID}
+ snapshot := &common.Snapshot{}
+ downloadSnapshot(scopes, snapshot, nil)
+ storeBootSnapshot(snapshot)
+ storeDataSnapshot(snapshot)
+ restoreContext()
+ }, 3)
})
})
diff --git a/backoff.go b/backoff.go
index 291a037..e3a7403 100644
--- a/backoff.go
+++ b/backoff.go
@@ -2,8 +2,8 @@
import (
"math"
- "math/rand"
"time"
+ "math/rand"
)
const defaultInitial time.Duration = 200 * time.Millisecond
@@ -13,6 +13,7 @@
type Backoff struct {
attempt int
initial, max time.Duration
+ jitter bool
backoffStrategy func() time.Duration
}
@@ -21,7 +22,7 @@
factor float64
}
-func NewExponentialBackoff(initial, max time.Duration, factor float64) *Backoff {
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
backoff := &ExponentialBackoff{}
if initial <= 0 {
@@ -39,9 +40,10 @@
backoff.max = max
backoff.attempt = 0
backoff.factor = factor
+ backoff.jitter = jitter
backoff.backoffStrategy = backoff.exponentialBackoffStrategy
- return &backoff.Backoff
+ return backoff
}
func (b *Backoff) Duration() time.Duration {
@@ -56,14 +58,15 @@
attempt := float64(b.Backoff.attempt)
duration := initial * math.Pow(b.factor, attempt)
- //introduce some jitter
- duration = (rand.Float64()*(duration-initial) + initial)
-
if duration > math.MaxInt64 {
return b.max
}
dur := time.Duration(duration)
+ if b.jitter {
+ duration = (rand.Float64()*(duration-initial) + initial)
+ }
+
if dur > b.max {
return b.max
}
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..ae85909
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,56 @@
+package apidApigeeSync
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "time"
+)
+
+var _ = Describe("backoff", func() {
+
+ Context("Backoff timeout calculations", func() {
+
+ It("Should properly apply defaults", func() {
+ log.Info("Starting backoff tests...")
+ b := NewExponentialBackoff(0, 0, 0, true)
+ Expect(defaultInitial).To(Equal(b.initial))
+ Expect(defaultMax).To(Equal(b.max))
+ Expect(defaultFactor).To(Equal(b.factor))
+
+ b = NewExponentialBackoff(-1, -1, -1, true)
+ Expect(defaultInitial).To(Equal(b.initial))
+ Expect(defaultMax).To(Equal(b.max))
+ Expect(defaultFactor).To(Equal(b.factor))
+ })
+
+ It("should properly apply exponential backoff strategy", func() {
+ b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ Expect(1600 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(4).To(Equal(b.Attempt()))
+ })
+
+ It("should reset properly", func() {
+ b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ b.Reset()
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ })
+ })
+
+})
diff --git a/changes.go b/changes.go
index d85eb96..0ac39b2 100644
--- a/changes.go
+++ b/changes.go
@@ -146,9 +146,9 @@
v.Add("block", block)
/*
- * Include all the scopes associated with the config Id
- * The Config Id is included as well, as it acts as the
- * Bootstrap scope
+ * Include all the scopes associated with the config Id
+ * The Config Id is included as well, as it acts as the
+ * Bootstrap scope
*/
for _, scope := range scopes {
v.Add("scope", scope)
@@ -159,6 +159,7 @@
uri := changesUri.String()
log.Debugf("Fetching changes: %s", uri)
+ /* If error, break the loop, and retry after interval */
client := &http.Client{Timeout: httpTimeout} // must be greater than block value
req, err := http.NewRequest("GET", uri, nil)
addHeaders(req)
@@ -219,6 +220,17 @@
return err
}
+ /*
+ * If the lastSequence is already newer or the same than what we got via
+ * resp.LastSequence, Ignore it.
+ */
+ if lastSequence != "" &&
+ getChangeStatus(lastSequence, resp.LastSequence) != 1 {
+ return changeServerError{
+ Code: "Ignore change, already have newer changes",
+ }
+ }
+
if changesRequireDDLSync(resp) {
return changeServerError{
Code: "DDL changes detected; must get new snapshot",
@@ -236,13 +248,8 @@
log.Debugf("No Changes detected for Scopes: %s", scopes)
}
- if lastSequence != resp.LastSequence {
- lastSequence = resp.LastSequence
- err := updateLastSequence(resp.LastSequence)
- if err != nil {
- log.Panic("Unable to update Sequence in DB")
- }
- }
+ updateSequence(resp.LastSequence)
+
return nil
}
@@ -283,3 +290,28 @@
return false
}
+
+/*
+ * seqCurr.Compare() will return 1, if its newer than seqPrev,
+ * else will return 0, if same, or -1 if older.
+ */
+func getChangeStatus(lastSeq string, currSeq string) int {
+ seqPrev, err := common.ParseSequence(lastSeq)
+ if err != nil {
+ log.Panic("Unable to parse previous sequence string")
+ }
+ seqCurr, err := common.ParseSequence(currSeq)
+ if err != nil {
+ log.Panic("Unable to parse current sequence string")
+ }
+ return seqCurr.Compare(seqPrev)
+}
+
+func updateSequence(seq string) {
+ lastSequence = seq
+ err := updateLastSequence(seq)
+ if err != nil {
+ log.Panic("Unable to update Sequence in DB")
+ }
+
+}
diff --git a/data.go b/data.go
index 4e5a62c..48f1ff7 100644
--- a/data.go
+++ b/data.go
@@ -88,8 +88,9 @@
log.Debugf("inserting into APID_CLUSTER: %v", dac)
+ //replace to accomodate same snapshot txid
stmt, err := txn.Prepare(`
- INSERT INTO APID_CLUSTER
+ REPLACE INTO APID_CLUSTER
(id, description, name, umbrella_org_app_name,
created, created_by, updated, updated_by,
last_sequence)
@@ -117,8 +118,9 @@
log.Debugf("insert DATA_SCOPE: %v", ds)
+ //replace to accomodate same snapshot txid
stmt, err := txn.Prepare(`
- INSERT INTO DATA_SCOPE
+ REPLACE INTO DATA_SCOPE
(id, apid_cluster_id, scope, org,
env, created, created_by, updated,
updated_by)
diff --git a/init_test.go b/init_test.go
index 6135875..479b4b6 100644
--- a/init_test.go
+++ b/init_test.go
@@ -14,7 +14,7 @@
initConfigDefaults()
Expect(apidInfo.InstanceName).To(Equal("testhost"))
- })
+ }, 3)
It("accept display name from config", func() {
config.Set(configName, "aa01")
@@ -23,7 +23,7 @@
apidInfoLatest, _ = getApidInstanceInfo()
Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
- })
+ }, 3)
})
diff --git a/listener.go b/listener.go
index ba6c02c..6c4b1ef 100644
--- a/listener.go
+++ b/listener.go
@@ -130,7 +130,7 @@
err = insertDataScope(ds, tx)
case common.Delete:
ds := makeDataScopeFromRow(change.OldRow)
- deleteDataScope(ds, tx)
+ err = deleteDataScope(ds, tx)
default:
// common.Update is not allowed
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/listener_test.go b/listener_test.go
index e426d39..ee0b4cc 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -48,7 +48,7 @@
}
Expect(func() { handler.Handle(&event) }).To(Panic())
- })
+ }, 3)
It("should process a valid Snapshot", func() {
@@ -204,7 +204,7 @@
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
})
Context("ApigeeSync change event", func() {
@@ -226,7 +226,7 @@
}
Expect(func() { handler.Handle(&event) }).To(Panic())
- })
+ }, 3)
It("update event should panic", func() {
@@ -243,7 +243,7 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
PIt("delete event should kill all the things!")
})
@@ -329,7 +329,7 @@
Expect(len(scopes)).To(Equal(2))
Expect(scopes[0]).To(Equal("s1"))
Expect(scopes[1]).To(Equal("s2"))
- })
+ }, 3)
It("delete event should delete", func() {
insert := common.ChangeList{
@@ -373,7 +373,7 @@
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(0))
- })
+ }, 3)
It("update event should panic", func() {
@@ -390,7 +390,7 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
})
diff --git a/snapshot.go b/snapshot.go
index 055d896..0158871 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -81,6 +81,7 @@
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
+
err := s.downloadSnapshot(scopes, snapshot)
if err != nil {
// this may happen during shutdown
@@ -97,6 +98,10 @@
}
// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
+ storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot *common.Snapshot) {
processSnapshot(snapshot)
}
@@ -116,7 +121,7 @@
log.Debug("download Snapshot for data scopes")
- var scopes = findScopesForId(apidInfo.ClusterID)
+ scopes := findScopesForId(apidInfo.ClusterID)
scopes = append(scopes, apidInfo.ClusterID)
snapshot := &common.Snapshot{}
err := s.downloadSnapshot(scopes, snapshot)
@@ -127,7 +132,10 @@
}
return
}
+ storeDataSnapshot(snapshot)
+}
+func storeDataSnapshot(snapshot *common.Snapshot) {
knownTables = extractTablesFromSnapshot(snapshot)
db, err := dataService.DBVersion(snapshot.SnapshotInfo)
@@ -150,8 +158,8 @@
case <-events.Emit(ApigeeSyncEventSelector, snapshot):
// the new snapshot has been processed
// if close() happen after persistKnownTablesToDB(), will not interrupt snapshot processing to maintain consistency
- // In this case, will close now
}
+
}
func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
@@ -216,6 +224,7 @@
// a blocking method
// will keep retrying with backoff until success
+
func (s *snapShotManager) downloadSnapshot(scopes []string, snapshot *common.Snapshot) error {
// if closed
if atomic.LoadInt32(s.isClosed) == int32(1) {
@@ -248,10 +257,8 @@
//pollWithBackoff only accepts function that accept a single quit channel
//to accomadate functions which need more parameters, wrap them in closures
attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
-
pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
return nil
-
}
func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {
diff --git a/token_test.go b/token_test.go
index 49d1eb6..045b318 100644
--- a/token_test.go
+++ b/token_test.go
@@ -30,7 +30,7 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically(">", 0))
Expect(t.needsRefresh()).To(BeFalse())
Expect(t.isValid()).To(BeTrue())
- })
+ }, 3)
It("should calculate expired token", func() {
@@ -42,7 +42,7 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeFalse())
- })
+ }, 3)
It("should calculate token needing refresh", func() {
@@ -54,7 +54,7 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeTrue())
- })
+ }, 3)
It("should calculate on empty token", func() {
@@ -62,7 +62,7 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically("<=", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeFalse())
- })
+ }, 3)
})
Context("tokenMan", func() {
@@ -91,7 +91,7 @@
Expect(bToken).To(Equal(token.AccessToken))
testedTokenManager.close()
ts.Close()
- })
+ }, 3)
It("should refresh when forced to", func() {
@@ -119,7 +119,7 @@
Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
testedTokenManager.close()
ts.Close()
- })
+ }, 3)
It("should refresh in refresh interval", func(done Done) {
@@ -157,7 +157,7 @@
ts.Close()
close(done)
- })
+ }, 3)
It("should have created_at_apid first time, update_at_apid after", func(done Done) {
finished := make(chan bool, 1)
@@ -197,6 +197,6 @@
testedTokenManager.close()
ts.Close()
close(done)
- })
+ }, 3)
})
})