Merge pull request #22 from 30x/XAPID-717

Xapid-717
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 51a40b3..960f0a4 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -38,6 +38,7 @@
 	config.Set(configSnapServerBaseURI, testServer.URL)
 	config.Set(configChangeServerBaseURI, testServer.URL)
 	config.Set(configSnapshotProtocol, "json")
+	config.Set(configPollInterval, 10*time.Millisecond)
 
 	config.Set(configName, "testhost")
 	config.Set(configApidClusterId, "bootstrap")
@@ -48,7 +49,7 @@
 
 	// set up mock server
 	mockParms := MockParms{
-		ReliableAPI:  true,
+		ReliableAPI:  false,
 		ClusterID:    config.GetString(configApidClusterId),
 		TokenKey:     config.GetString(configConsumerKey),
 		TokenSecret:  config.GetString(configConsumerSecret),
diff --git a/apigee_sync.go b/apigee_sync.go
index 449780b..ec3031e 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -52,41 +52,33 @@
 }
 
 /*
- * Helper function that sleeps for N seconds if comm with change agent
- * fails. The retry interval gradually is incremented each time it fails
- * till it reaches the Polling Int time, and after which it constantly
- * retries at the polling time interval
+ * Polls change agent for changes. In event of errors, uses a doubling
+ * backoff from 200ms up to a max delay of the configPollInterval value.
  */
 func updatePeriodicChanges() {
 
-	times := 1
-	pollInterval := config.GetInt(configPollInterval)
+	var backOffFunc func()
+	pollInterval := config.GetDuration(configPollInterval)
 	for {
-		startTime := time.Second
+		start := time.Now().Second()
 		err := pollChangeAgent()
+		end := time.Now().Second()
 		if err != nil {
 			log.Debugf("Error connecting to changeserver: %v", err)
 		}
-		endTime := time.Second
-		// Gradually increase retry interval, and max at some level
-		if endTime-startTime <= 1 {
-			if times < pollInterval {
-				times++
-			} else {
-				times = pollInterval
+		if end-start <= 1 {
+			if backOffFunc == nil {
+				backOffFunc = createBackOff(200*time.Millisecond, pollInterval)
 			}
-			log.Debugf("Connecting to changeserver...")
-			time.Sleep(time.Duration(times) * 200 * time.Millisecond)
+			backOffFunc()
 		} else {
-			// Reset sleep interval
-			times = 1
+			backOffFunc = nil
 		}
-
 	}
 }
 
 /*
- * Long polls every 45 seconds the change agent. Parses the response from
+ * Long polls the change agent with a 45 second block. Parses the response from
  * change agent and raises an event.
  */
 func pollChangeAgent() error {
@@ -139,7 +131,7 @@
 		log.Debugf("Fetching changes: %s", uri)
 
 		/* If error, break the loop, and retry after interval */
-		client := &http.Client{}
+		client := &http.Client{Timeout: time.Minute} // must be greater than block value
 		req, err := http.NewRequest("GET", uri, nil)
 		addHeaders(req)
 		r, err := client.Do(req)
@@ -152,11 +144,13 @@
 		if r.StatusCode != http.StatusOK {
 			if r.StatusCode == http.StatusUnauthorized {
 				token = ""
+
 				log.Errorf("Token expired? Unauthorized request.")
 			}
 			r.Body.Close()
 			if r.StatusCode != http.StatusNotModified {
-				log.Errorf("Get changes request failed with Resp err: %d", r.StatusCode)
+				err = errors.New("force backoff")
+				log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
 			} else {
 				log.Infof("Get changes request timed out with %d", http.StatusNotModified)
 			}
@@ -209,12 +203,12 @@
 // simple doubling back-off
 func createBackOff(retryIn, maxBackOff time.Duration) func() {
 	return func() {
-		log.Debugf("backoff called. will retry in %s.", retryIn)
-		time.Sleep(retryIn)
-		retryIn = retryIn * time.Duration(2)
 		if retryIn > maxBackOff {
 			retryIn = maxBackOff
 		}
+		log.Debugf("backoff called. will retry in %s.", retryIn)
+		time.Sleep(retryIn)
+		retryIn = retryIn * time.Duration(2)
 	}
 }
 
@@ -263,23 +257,25 @@
 			req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
 		}
 
-		client := &http.Client{}
+		client := &http.Client{Timeout: time.Minute}
 		resp, err := client.Do(req)
 		if err != nil {
 			log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
 			continue
 		}
-		defer resp.Body.Close()
-		if resp.StatusCode != 200 {
-			log.Errorf("Oauth Request Failed with Resp Code: %v", resp.StatusCode)
-			continue
-		}
+
 		body, err := ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
 		if err != nil {
 			log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
 			continue
 		}
 
+		if resp.StatusCode != 200 {
+			log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
+			continue
+		}
+
 		var oauthResp oauthTokenResp
 		log.Debugf("Response: %s ", body)
 		err = json.Unmarshal(body, &oauthResp)
@@ -429,6 +425,7 @@
 
 	client := &http.Client{
 		CheckRedirect: Redirect,
+		Timeout:       time.Minute,
 	}
 
 	retryIn := 5 * time.Millisecond
@@ -464,6 +461,11 @@
 			continue
 		}
 
+		if r.StatusCode != 200 {
+			log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
+			continue
+		}
+
 		// Decode the Snapshot server response
 		var resp common.Snapshot
 		err = json.NewDecoder(r.Body).Decode(&resp)
@@ -483,11 +485,6 @@
 			}
 		}
 
-		if r.StatusCode != 200 {
-			log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
-			continue
-		}
-
 		log.Info("Emitting Snapshot to plugins")
 		events.Emit(ApigeeSyncEventSelector, &resp)
 
diff --git a/init.go b/init.go
index b96f681..5f3c6cb 100644
--- a/init.go
+++ b/init.go
@@ -6,6 +6,7 @@
 	"os"
 
 	"github.com/30x/apid-core"
+	"time"
 )
 
 const (
@@ -51,7 +52,7 @@
 }
 
 func initDefaults() {
-	config.SetDefault(configPollInterval, 120)
+	config.SetDefault(configPollInterval, 120 * time.Second)
 	config.SetDefault(configSnapshotProtocol, "json")
 	name, errh := os.Hostname()
 	if (errh != nil) && (len(config.GetString(configName)) == 0) {
diff --git a/mock_server.go b/mock_server.go
index 7fc488e..53f6c2c 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -678,6 +678,10 @@
 // create []common.Table from array of tableRowMaps
 func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
 	result := common.ChangeList{}
+	if len(changeLists) > 0 {
+		result.FirstSequence = changeLists[0].FirstSequence
+		result.LastSequence = changeLists[len(changeLists)-1].LastSequence
+	}
 	for _, cl := range changeLists {
 		for _, c := range cl.Changes {
 			result.Changes = append(result.Changes, c)