Have a single client connection and reuse (#43) * Reuse a single client connection. * Add more connections for the tests. * Default is to cache conns. * Declare the client at the package level. It is thread safe anyway. * remove nil assignment of re-direct. * Set the re-direct as part of init. * Remove pointess calling of http.client via a function.
diff --git a/apigee_sync.go b/apigee_sync.go index ab47d17..391355b 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -7,8 +7,9 @@ ) const ( - httpTimeout = time.Minute - pluginTimeout = time.Minute + httpTimeout = time.Minute + pluginTimeout = time.Minute + maxIdleConnsPerHost = 10 ) var knownTables = make(map[string]bool)
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index a3ba0f5..78fb1de 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -5,8 +5,8 @@ "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "net/http" "net/http/httptest" - //"time" ) var _ = Describe("Sync", func() { @@ -268,7 +268,6 @@ */ It("Should be able to handle duplicate snapshot during bootstrap", func() { initializeContext() - tokenManager = createTokenManager() snapManager = createSnapShotManager() events.Listen(ApigeeSyncEventSelector, &handler{}) @@ -282,5 +281,41 @@ <-snapManager.close() tokenManager.close() }, 3) + + It("Reuse http.Client connection for multiple concurrent requests", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + })) + tr := &http.Transport{ + MaxIdleConnsPerHost: maxIdleConnsPerHost, + } + var rspcnt int = 0 + ch := make(chan *http.Response) + client := &http.Client{Transport: tr} + for i := 0; i < 2*maxIdleConnsPerHost; i++ { + go func(client *http.Client) { + req, err := http.NewRequest("GET", server.URL, nil) + resp, err := client.Do(req) + if err != nil { + Fail("Unable to process Client request") + } + ch <- resp + resp.Body.Close() + + }(client) + } + for { + select { + case resp := <-ch: + Expect(resp.StatusCode).To(Equal(http.StatusOK)) + if rspcnt >= 2*maxIdleConnsPerHost-1 { + return + } + rspcnt++ + default: + } + } + + }, 3) + }) })
diff --git a/changes.go b/changes.go index 69e26db..e9be041 100644 --- a/changes.go +++ b/changes.go
@@ -160,11 +160,9 @@ log.Debugf("Fetching changes: %s", uri) /* If error, break the loop, and retry after interval */ - client := &http.Client{Timeout: httpTimeout} // must be greater than block value req, err := http.NewRequest("GET", uri, nil) addHeaders(req) - - r, err := client.Do(req) + r, err := httpclient.Do(req) if err != nil { log.Errorf("change agent comm error: %s", err) // if closed
diff --git a/init.go b/init.go index 0064334..cf90781 100644 --- a/init.go +++ b/init.go
@@ -3,8 +3,8 @@ import ( "encoding/json" "fmt" + "net/http" "os" - "time" "github.com/30x/apid-core" @@ -39,6 +39,7 @@ tokenManager *tokenMan changeManager *pollChangeManager snapManager *snapShotManager + httpclient *http.Client /* Set during post plugin initialization * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called @@ -74,6 +75,19 @@ func initVariables(services apid.Services) error { dataService = services.Data() events = services.Events() + + tr := &http.Transport{ + MaxIdleConnsPerHost: maxIdleConnsPerHost, + } + httpclient = &http.Client{ + Transport: tr, + Timeout: httpTimeout, + CheckRedirect: func(req *http.Request, _ []*http.Request) error { + req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) + return nil + }, + } + //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines //also useful for testing snapManager = createSnapShotManager()
diff --git a/snapshot.go b/snapshot.go index 3287c4c..3b288a2 100644 --- a/snapshot.go +++ b/snapshot.go
@@ -247,22 +247,14 @@ uri := snapshotUri.String() log.Infof("Snapshot Download: %s", uri) - client := &http.Client{ - CheckRedirect: func(req *http.Request, _ []*http.Request) error { - req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) - return nil - }, - Timeout: httpTimeout, - } - //pollWithBackoff only accepts function that accept a single quit channel //to accommodate functions which need more parameters, wrap them in closures - attemptDownload := getAttemptDownloadClosure(client, snapshot, uri) + attemptDownload := getAttemptDownloadClosure(snapshot, uri) pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError) return nil } -func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error { +func getAttemptDownloadClosure(snapshot *common.Snapshot, uri string) func(chan bool) error { return func(_ chan bool) error { req, err := http.NewRequest("GET", uri, nil) if err != nil { @@ -283,7 +275,7 @@ } // Issue the request to the snapshot server - r, err := client.Do(req) + r, err := httpclient.Do(req) if err != nil { log.Errorf("Snapshotserver comm error: %v", err) return err