Merge pull request #41 from 30x/detect-clusterid-change
detect apid_cluster_id change in config file and boot clean
diff --git a/apigee_sync.go b/apigee_sync.go
index ab47d17..391355b 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -7,8 +7,9 @@
)
const (
- httpTimeout = time.Minute
- pluginTimeout = time.Minute
+ httpTimeout = time.Minute
+ pluginTimeout = time.Minute
+ maxIdleConnsPerHost = 10
)
var knownTables = make(map[string]bool)
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index da7be01..60652c8 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -5,8 +5,8 @@
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "net/http"
"net/http/httptest"
- //"time"
)
var _ = Describe("Sync", func() {
@@ -283,7 +283,6 @@
*/
It("Should be able to handle duplicate snapshot during bootstrap", func() {
initializeContext()
-
tokenManager = createTokenManager()
snapManager = createSnapShotManager()
events.Listen(ApigeeSyncEventSelector, &handler{})
@@ -297,5 +296,41 @@
<-snapManager.close()
tokenManager.close()
}, 3)
+
+ It("Reuse http.Client connection for multiple concurrent requests", func() {
+ server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ }))
+ tr := &http.Transport{
+ MaxIdleConnsPerHost: maxIdleConnsPerHost,
+ }
+ var rspcnt int = 0
+ ch := make(chan *http.Response)
+ client := &http.Client{Transport: tr}
+ for i := 0; i < 2*maxIdleConnsPerHost; i++ {
+ go func(client *http.Client) {
+ req, err := http.NewRequest("GET", server.URL, nil)
+ resp, err := client.Do(req)
+ if err != nil {
+ Fail("Unable to process Client request")
+ }
+ ch <- resp
+ resp.Body.Close()
+
+ }(client)
+ }
+ for {
+ select {
+ case resp := <-ch:
+ Expect(resp.StatusCode).To(Equal(http.StatusOK))
+ if rspcnt >= 2*maxIdleConnsPerHost-1 {
+ return
+ }
+ rspcnt++
+ default:
+ }
+ }
+
+ }, 3)
+
})
})
diff --git a/changes.go b/changes.go
index 11fdc3a..e9be041 100644
--- a/changes.go
+++ b/changes.go
@@ -51,8 +51,7 @@
}
// not launched
if atomic.LoadInt32(c.isLaunched) == int32(0) {
- log.Debug("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
- log.Warn("Attempt to close unstarted change manager")
+ log.Warn("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! Will wait until pollChangeWithBackoff is launched and then kill it and tokenManager!")
go func() {
c.quitChan <- true
tokenManager.close()
@@ -161,11 +160,9 @@
log.Debugf("Fetching changes: %s", uri)
/* If error, break the loop, and retry after interval */
- client := &http.Client{Timeout: httpTimeout} // must be greater than block value
req, err := http.NewRequest("GET", uri, nil)
addHeaders(req)
-
- r, err := client.Do(req)
+ r, err := httpclient.Do(req)
if err != nil {
log.Errorf("change agent comm error: %s", err)
// if closed
diff --git a/init.go b/init.go
index 0064334..cf90781 100644
--- a/init.go
+++ b/init.go
@@ -3,8 +3,8 @@
import (
"encoding/json"
"fmt"
+ "net/http"
"os"
-
"time"
"github.com/30x/apid-core"
@@ -39,6 +39,7 @@
tokenManager *tokenMan
changeManager *pollChangeManager
snapManager *snapShotManager
+ httpclient *http.Client
/* Set during post plugin initialization
* set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
@@ -74,6 +75,19 @@
func initVariables(services apid.Services) error {
dataService = services.Data()
events = services.Events()
+
+ tr := &http.Transport{
+ MaxIdleConnsPerHost: maxIdleConnsPerHost,
+ }
+ httpclient = &http.Client{
+ Transport: tr,
+ Timeout: httpTimeout,
+ CheckRedirect: func(req *http.Request, _ []*http.Request) error {
+ req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
+ return nil
+ },
+ }
+
//TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
//also useful for testing
snapManager = createSnapShotManager()
diff --git a/listener.go b/listener.go
index 6c4b1ef..49e3d6b 100644
--- a/listener.go
+++ b/listener.go
@@ -24,7 +24,7 @@
} else if snapShot, ok := e.(*common.Snapshot); ok {
processSnapshot(snapShot)
} else {
- log.Errorf("Received invalid event. Ignoring. %v", e)
+ log.Debugf("Received invalid event. Ignoring. %v", e)
}
}
diff --git a/snapshot.go b/snapshot.go
index 7a3c102..3b288a2 100644
--- a/snapshot.go
+++ b/snapshot.go
@@ -247,22 +247,14 @@
uri := snapshotUri.String()
log.Infof("Snapshot Download: %s", uri)
- client := &http.Client{
- CheckRedirect: func(req *http.Request, _ []*http.Request) error {
- req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken())
- return nil
- },
- Timeout: httpTimeout,
- }
-
//pollWithBackoff only accepts function that accept a single quit channel
//to accommodate functions which need more parameters, wrap them in closures
- attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
+ attemptDownload := getAttemptDownloadClosure(snapshot, uri)
pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
return nil
}
-func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {
+func getAttemptDownloadClosure(snapshot *common.Snapshot, uri string) func(chan bool) error {
return func(_ chan bool) error {
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
@@ -283,7 +275,7 @@
}
// Issue the request to the snapshot server
- r, err := client.Do(req)
+ r, err := httpclient.Do(req)
if err != nil {
log.Errorf("Snapshotserver comm error: %v", err)
return err
@@ -300,7 +292,7 @@
// Decode the Snapshot server response
err = processSnapshotResponse(r, snapshot)
if err != nil {
- log.Errorf("Response Data not parsable: %v", err)
+ log.Errorf("Snapshot server response Data not parsable: %v", err)
return err
}