Merge pull request #22 from 30x/XAPID-717
Xapid-717
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 51a40b3..960f0a4 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -38,6 +38,7 @@
config.Set(configSnapServerBaseURI, testServer.URL)
config.Set(configChangeServerBaseURI, testServer.URL)
config.Set(configSnapshotProtocol, "json")
+ config.Set(configPollInterval, 10*time.Millisecond)
config.Set(configName, "testhost")
config.Set(configApidClusterId, "bootstrap")
@@ -48,7 +49,7 @@
// set up mock server
mockParms := MockParms{
- ReliableAPI: true,
+ ReliableAPI: false,
ClusterID: config.GetString(configApidClusterId),
TokenKey: config.GetString(configConsumerKey),
TokenSecret: config.GetString(configConsumerSecret),
diff --git a/apigee_sync.go b/apigee_sync.go
index 449780b..ec3031e 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -52,41 +52,33 @@
}
/*
- * Helper function that sleeps for N seconds if comm with change agent
- * fails. The retry interval gradually is incremented each time it fails
- * till it reaches the Polling Int time, and after which it constantly
- * retries at the polling time interval
+ * Polls change agent for changes. In event of errors, uses a doubling
+ * backoff from 200ms up to a max delay of the configPollInterval value.
*/
func updatePeriodicChanges() {
- times := 1
- pollInterval := config.GetInt(configPollInterval)
+ var backOffFunc func()
+ pollInterval := config.GetDuration(configPollInterval)
for {
- startTime := time.Second
+ start := time.Now().Second()
err := pollChangeAgent()
+ end := time.Now().Second()
if err != nil {
log.Debugf("Error connecting to changeserver: %v", err)
}
- endTime := time.Second
- // Gradually increase retry interval, and max at some level
- if endTime-startTime <= 1 {
- if times < pollInterval {
- times++
- } else {
- times = pollInterval
+ if end-start <= 1 {
+ if backOffFunc == nil {
+ backOffFunc = createBackOff(200*time.Millisecond, pollInterval)
}
- log.Debugf("Connecting to changeserver...")
- time.Sleep(time.Duration(times) * 200 * time.Millisecond)
+ backOffFunc()
} else {
- // Reset sleep interval
- times = 1
+ backOffFunc = nil
}
-
}
}
/*
- * Long polls every 45 seconds the change agent. Parses the response from
+ * Long polls the change agent with a 45 second block. Parses the response from
* change agent and raises an event.
*/
func pollChangeAgent() error {
@@ -139,7 +131,7 @@
log.Debugf("Fetching changes: %s", uri)
/* If error, break the loop, and retry after interval */
- client := &http.Client{}
+ client := &http.Client{Timeout: time.Minute} // must be greater than block value
req, err := http.NewRequest("GET", uri, nil)
addHeaders(req)
r, err := client.Do(req)
@@ -152,11 +144,13 @@
if r.StatusCode != http.StatusOK {
if r.StatusCode == http.StatusUnauthorized {
token = ""
+
log.Errorf("Token expired? Unauthorized request.")
}
r.Body.Close()
if r.StatusCode != http.StatusNotModified {
- log.Errorf("Get changes request failed with Resp err: %d", r.StatusCode)
+ err = errors.New("force backoff")
+ log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
} else {
log.Infof("Get changes request timed out with %d", http.StatusNotModified)
}
@@ -209,12 +203,12 @@
// simple doubling back-off
func createBackOff(retryIn, maxBackOff time.Duration) func() {
return func() {
- log.Debugf("backoff called. will retry in %s.", retryIn)
- time.Sleep(retryIn)
- retryIn = retryIn * time.Duration(2)
if retryIn > maxBackOff {
retryIn = maxBackOff
}
+ log.Debugf("backoff called. will retry in %s.", retryIn)
+ time.Sleep(retryIn)
+ retryIn = retryIn * time.Duration(2)
}
}
@@ -263,23 +257,25 @@
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}
- client := &http.Client{}
+ client := &http.Client{Timeout: time.Minute}
resp, err := client.Do(req)
if err != nil {
log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
continue
}
- defer resp.Body.Close()
- if resp.StatusCode != 200 {
- log.Errorf("Oauth Request Failed with Resp Code: %v", resp.StatusCode)
- continue
- }
+
body, err := ioutil.ReadAll(resp.Body)
+ resp.Body.Close()
if err != nil {
log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
continue
}
+ if resp.StatusCode != 200 {
+ log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
+ continue
+ }
+
var oauthResp oauthTokenResp
log.Debugf("Response: %s ", body)
err = json.Unmarshal(body, &oauthResp)
@@ -429,6 +425,7 @@
client := &http.Client{
CheckRedirect: Redirect,
+ Timeout: time.Minute,
}
retryIn := 5 * time.Millisecond
@@ -464,6 +461,11 @@
continue
}
+ if r.StatusCode != 200 {
+ log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
+ continue
+ }
+
// Decode the Snapshot server response
var resp common.Snapshot
err = json.NewDecoder(r.Body).Decode(&resp)
@@ -483,11 +485,6 @@
}
}
- if r.StatusCode != 200 {
- log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
- continue
- }
-
log.Info("Emitting Snapshot to plugins")
events.Emit(ApigeeSyncEventSelector, &resp)
diff --git a/init.go b/init.go
index b96f681..5f3c6cb 100644
--- a/init.go
+++ b/init.go
@@ -6,6 +6,7 @@
"os"
"github.com/30x/apid-core"
+ "time"
)
const (
@@ -51,7 +52,7 @@
}
func initDefaults() {
- config.SetDefault(configPollInterval, 120)
+ config.SetDefault(configPollInterval, 120 * time.Second)
config.SetDefault(configSnapshotProtocol, "json")
name, errh := os.Hostname()
if (errh != nil) && (len(config.GetString(configName)) == 0) {
diff --git a/mock_server.go b/mock_server.go
index 7fc488e..53f6c2c 100644
--- a/mock_server.go
+++ b/mock_server.go
@@ -678,6 +678,10 @@
// create []common.Table from array of tableRowMaps
func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
result := common.ChangeList{}
+ if len(changeLists) > 0 {
+ result.FirstSequence = changeLists[0].FirstSequence
+ result.LastSequence = changeLists[len(changeLists)-1].LastSequence
+ }
for _, cl := range changeLists {
for _, c := range cl.Changes {
result.Changes = append(result.Changes, c)