fixed problems caused by merge conflicts, addressed comments
diff --git a/apigee_sync.go b/apigee_sync.go index 5c77056..62088b0 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -93,7 +93,7 @@ } func addHeaders(req *http.Request) { - req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken()) + req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken()) req.Header.Set("apid_instance_id", apidInfo.InstanceID) req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) @@ -119,4 +119,4 @@ func (a changeServerError) Error() string { return a.Code -} \ No newline at end of file +}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index 9cdec11..06e44ba 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -254,7 +254,6 @@ testMock.forceNewSnapshot() }) - It("Verify the Sequence Number Logic works as expected", func() { Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) @@ -271,14 +270,17 @@ initializeContext() tokenManager = createTokenManager() + snapManager = createSnapShotManager() events.Listen(ApigeeSyncEventSelector, &handler{}) scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} - downloadSnapshot(scopes, snapshot, nil) - storeBootSnapshot(snapshot) - storeDataSnapshot(snapshot) + snapManager.downloadSnapshot(scopes, snapshot) + snapManager.storeBootSnapshot(snapshot) + snapManager.storeDataSnapshot(snapshot) restoreContext() + <-snapManager.close() + tokenManager.close() }, 3) }) })
diff --git a/backoff.go b/backoff.go index e3a7403..94f411a 100644 --- a/backoff.go +++ b/backoff.go
@@ -2,8 +2,8 @@ import ( "math" - "time" "math/rand" + "time" ) const defaultInitial time.Duration = 200 * time.Millisecond @@ -13,7 +13,7 @@ type Backoff struct { attempt int initial, max time.Duration - jitter bool + jitter bool backoffStrategy func() time.Duration }
diff --git a/backoff_test.go b/backoff_test.go index ae85909..f25399b 100644 --- a/backoff_test.go +++ b/backoff_test.go
@@ -24,7 +24,7 @@ }) It("should properly apply exponential backoff strategy", func() { - b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false) + b := NewExponentialBackoff(200*time.Millisecond, 2*time.Second, 2, false) Expect(200 * time.Millisecond).To(Equal(b.Duration())) Expect(1).To(Equal(b.Attempt())) Expect(400 * time.Millisecond).To(Equal(b.Duration())) @@ -36,7 +36,7 @@ }) It("should reset properly", func() { - b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false) + b := NewExponentialBackoff(200*time.Millisecond, 2*time.Second, 2, false) Expect(200 * time.Millisecond).To(Equal(b.Duration())) Expect(1).To(Equal(b.Attempt())) Expect(400 * time.Millisecond).To(Equal(b.Duration()))
diff --git a/changes.go b/changes.go index 0ac39b2..0b77cba 100644 --- a/changes.go +++ b/changes.go
@@ -44,20 +44,21 @@ if atomic.SwapInt32(c.isClosed, 1) == int32(1) { log.Error("pollChangeManager: close() called on a closed pollChangeManager!") go func() { - finishChan <- false log.Debug("change manager closed") + finishChan <- false }() return finishChan } // not launched if atomic.LoadInt32(c.isLaunched) == int32(0) { - log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!") + log.Debug("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!") + log.Warn("Attempt to close unstarted change manager") go func() { c.quitChan <- true tokenManager.close() <-snapManager.close() - finishChan <- false log.Debug("change manager closed") + finishChan <- false }() return finishChan } @@ -67,8 +68,8 @@ c.quitChan <- true tokenManager.close() <-snapManager.close() - finishChan <- true log.Debug("change manager closed") + finishChan <- true }() return finishChan }
diff --git a/snapshot.go b/snapshot.go index 0158871..438ddab 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -67,7 +67,6 @@ func (s *snapShotManager) downloadBootSnapshot() { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!") - return } defer atomic.StoreInt32(s.isDownloading, int32(0)) @@ -81,7 +80,7 @@ scopes := []string{apidInfo.ClusterID} snapshot := &common.Snapshot{} - + err := s.downloadSnapshot(scopes, snapshot) if err != nil { // this may happen during shutdown @@ -98,10 +97,10 @@ } // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot - storeBootSnapshot(snapshot) + s.storeBootSnapshot(snapshot) } -func storeBootSnapshot(snapshot *common.Snapshot) { +func (s *snapShotManager) storeBootSnapshot(snapshot *common.Snapshot) { processSnapshot(snapshot) } @@ -109,7 +108,6 @@ func (s *snapShotManager) downloadDataSnapshot() { if atomic.SwapInt32(s.isDownloading, 1) == int32(1) { log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!") - return } defer atomic.StoreInt32(s.isDownloading, int32(0)) @@ -132,10 +130,10 @@ } return } - storeDataSnapshot(snapshot) + s.storeDataSnapshot(snapshot) } -func storeDataSnapshot(snapshot *common.Snapshot) { +func (s *snapShotManager) storeDataSnapshot(snapshot *common.Snapshot) { knownTables = extractTablesFromSnapshot(snapshot) db, err := dataService.DBVersion(snapshot.SnapshotInfo)