send deployment results to tracker server, change POST /deployments to PUT /deployments
diff --git a/api.go b/api.go index f4f2803..e677294 100644 --- a/api.go +++ b/api.go
@@ -9,6 +9,7 @@ "strconv" "time" "sync" + "net/url" ) const ( @@ -59,7 +60,7 @@ func InitAPI() { services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") - services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST") + services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT") } func writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -279,9 +280,52 @@ return } + if len(validResults) > 0 { + go transmitDeploymentResultsToServer(validResults) + setDeploymentResults(validResults) + } + w.Write([]byte("OK")) +} - setDeploymentResults(validResults) +func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error { - //go transmitDeploymentResultsToServer(validResults) + retryIn := bundleRetryDelay + maxBackOff := 5 * time.Minute + backOffFunc := createBackoff(retryIn, maxBackOff) + + uri, err := url.Parse(apiServerBaseURI.String()) + if err != nil { + log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err) + return err + } + uri.Path = fmt.Sprintf("/clusters/%s/apids/%s/deployments", apidClusterID, apidInstanceID) + + resultJSON, err := json.Marshal(validResults) + if err != nil { + log.Errorf("unable to marshal deployment results %v: %v", validResults, err) + return err + } + + for { + log.Debugf("transmitting deployment results to tracker: %s", string(resultJSON)) + req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(resultJSON)) + req.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil || resp.StatusCode != http.StatusOK { + if err != nil { + log.Errorf("failed to communicate with tracking service: %v", err) + } else { + b, _ := ioutil.ReadAll(resp.Body) + log.Errorf("tracking service call failed. code: %d, body: %s", resp.StatusCode, string(b)) + } + backOffFunc() + resp.Body.Close() + continue + } + + resp.Body.Close() + return nil + } }
diff --git a/api_test.go b/api_test.go index f267303..1f35233 100644 --- a/api_test.go +++ b/api_test.go
@@ -182,7 +182,7 @@ }) }) - Context("POST /deployments", func() { + Context("PUT /deployments", func() { It("should return BadRequest for invalid request", func() { @@ -196,7 +196,7 @@ payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) - req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload)) + req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) req.Header.Add("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) @@ -221,7 +221,7 @@ payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) - req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload)) + req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) req.Header.Add("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) @@ -248,7 +248,7 @@ payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) - req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload)) + req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) req.Header.Add("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) @@ -271,7 +271,7 @@ uri, err := url.Parse(testServer.URL) uri.Path = deploymentsEndpoint - deploymentResult := apiDeploymentResults{ + deploymentResults := apiDeploymentResults{ apiDeploymentResult{ ID: deploymentID, Status: RESPONSE_STATUS_FAIL, @@ -279,10 +279,10 @@ Message: "Some error message", }, } - payload, err := json.Marshal(deploymentResult) + payload, err := json.Marshal(deploymentResults) Expect(err).ShouldNot(HaveOccurred()) - req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload)) + req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) req.Header.Add("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) @@ -300,6 +300,29 @@ Expect(deploy_error_code).Should(Equal(100)) Expect(deploy_error_message).Should(Equal("Some error message")) }) + + It("should communicate status to tracking server", func() { + deploymentResults := apiDeploymentResults{ + apiDeploymentResult{ + ID: "deploymentID", + Status: RESPONSE_STATUS_FAIL, + ErrorCode: 100, + Message: "Some error message", + }, + } + + err := transmitDeploymentResultsToServer(deploymentResults) + Expect(err).NotTo(HaveOccurred()) + + Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID")) + Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID")) + Expect(testLastTrackerBody).ToNot(BeEmpty()) + + var uploaded apiDeploymentResults + json.Unmarshal(testLastTrackerBody, &uploaded) + + Expect(uploaded).To(Equal(deploymentResults)) + }) }) })
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml index 916c081..c26d47f 100644 --- a/apidGatewayDeploy-api.yaml +++ b/apidGatewayDeploy-api.yaml
@@ -118,7 +118,7 @@ $ref: '#/definitions/DeploymentResponse' '304': description: Deployment not modified. - post: + put: description: Save results of deployment parameters: - name: _
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 33d3cf3..061879d 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -17,8 +17,10 @@ ) var ( - tmpDir string - testServer *httptest.Server + tmpDir string + testServer *httptest.Server + testLastTrackerVars map[string]string + testLastTrackerBody []byte ) var _ = BeforeSuite(func() { @@ -31,6 +33,9 @@ Expect(err).NotTo(HaveOccurred()) config.Set("local_storage_path", tmpDir) + config.Set(configApidInstanceID, "INSTANCE_ID") + config.Set(configApidClusterID, "CLUSTER_ID") + config.Set(configApiServerBaseURI, "http://localhost") apid.InitializePlugins() @@ -59,8 +64,29 @@ time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond)) } w.Write([]byte("/bundles/" + vars["id"])) - }) + + }).Methods("GET") + + // fake an unreliable APID tracker + router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments", + func(w http.ResponseWriter, req *http.Request) { + count++ + if count % 2 == 0 { + w.WriteHeader(500) + return + } + + testLastTrackerVars = apid.API().Vars(req) + testLastTrackerBody, err = ioutil.ReadAll(req.Body) + Expect(err).ToNot(HaveOccurred()) + + w.Write([]byte("OK")) + + }).Methods("PUT") testServer = httptest.NewServer(router) + + apiServerBaseURI, err = url.Parse(testServer.URL) + Expect(err).NotTo(HaveOccurred()) }) var _ = AfterSuite(func() {
diff --git a/bundle.go b/bundle.go index 4616476..44f9a9c 100644 --- a/bundle.go +++ b/bundle.go
@@ -1,25 +1,37 @@ package apiGatewayDeploy import ( + "crypto/md5" + "encoding/base64" + "encoding/hex" + "errors" "fmt" + "hash" + "hash/crc32" "io" + "io/ioutil" "net/http" "net/url" "os" "path" "time" - "encoding/base64" - "io/ioutil" - "hash/crc32" - "errors" - "crypto/md5" - "hash" - "encoding/hex" ) var bundleRetryDelay time.Duration = time.Second var bundleDownloadTimeout time.Duration = 10 * time.Minute +// simple doubling back-off +func createBackoff(retryIn, maxBackOff time.Duration) func() { + return func() { + log.Debugf("backoff called. will retry in %s.", retryIn) + time.Sleep(retryIn) + retryIn = retryIn * time.Duration(2) + if retryIn > maxBackOff { + retryIn = maxBackOff + } + } +} + func downloadBundle(dep DataDeployment) { log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI) @@ -39,22 +51,14 @@ return } - // simple doubling back-off retryIn := bundleRetryDelay maxBackOff := 5 * time.Minute - backOff := func() { - log.Debugf("will retry failed download in %s: %v", retryIn, err) - time.Sleep(retryIn) - retryIn = retryIn * time.Duration(2) - if retryIn > maxBackOff { - retryIn = maxBackOff - } - } + backOffFunc := createBackoff(retryIn, maxBackOff) // timeout and mark deployment failed timeout := time.NewTimer(bundleDownloadTimeout) go func() { - <- timeout.C + <-timeout.C log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI) var errMessage string if err != nil { @@ -98,14 +102,14 @@ break } - backOff() + backOffFunc() hashWriter.Reset() } log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI) // send deployments to client - deploymentsChanged<- dep.ID + deploymentsChanged <- dep.ID } func getBundleFile(dep DataDeployment) string {
diff --git a/init.go b/init.go index 4d32964..49d4912 100644 --- a/init.go +++ b/init.go
@@ -1,7 +1,9 @@ package apiGatewayDeploy import ( + "fmt" "github.com/30x/apid" + "net/url" "os" "path" "time" @@ -12,6 +14,9 @@ configDebounceDuration = "gatewaydeploy_debounce_duration" configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout" + configApiServerBaseURI = "apigeesync_proxy_server_base" + configApidInstanceID = "apigeesync_apid_instance_id" + configApidClusterID = "apigeesync_cluster_id" ) var ( @@ -21,6 +26,9 @@ bundlePath string debounceDuration time.Duration bundleCleanupDelay time.Duration + apiServerBaseURI *url.URL + apidInstanceID string + apidClusterID string ) func init() { @@ -33,24 +41,44 @@ log.Debug("start init") config := services.Config() + + if !config.IsSet(configApiServerBaseURI) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI) + } + var err error + apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI)) + if err != nil { + return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err) + } + + if !config.IsSet(configApidInstanceID) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID) + } + apidInstanceID = config.GetString(configApidInstanceID) + + if !config.IsSet(configApidClusterID) { + return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID) + } + apidClusterID = config.GetString(configApidClusterID) + config.SetDefault(configBundleDirKey, "bundles") config.SetDefault(configDebounceDuration, time.Second) config.SetDefault(configBundleCleanupDelay, time.Minute) - config.SetDefault(configBundleDownloadTimeout, 5 * time.Minute) + config.SetDefault(configBundleDownloadTimeout, 5*time.Minute) debounceDuration = config.GetDuration(configDebounceDuration) if debounceDuration < time.Millisecond { - log.Panicf("%s must be a positive duration", configDebounceDuration) + return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration) } bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay) if bundleCleanupDelay < time.Millisecond { - log.Panicf("%s must be a positive duration", configBundleCleanupDelay) + return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay) } bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout) if bundleDownloadTimeout < time.Millisecond { - log.Panicf("%s must be a positive duration", configBundleDownloadTimeout) + return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout) } data = services.Data() @@ -59,7 +87,7 @@ storagePath := config.GetString("local_storage_path") bundlePath = path.Join(storagePath, relativeBundlePath) if err := os.MkdirAll(bundlePath, 0700); err != nil { - log.Panicf("Failed bundle directory creation: %v", err) + return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err) } log.Infof("Bundle directory path is %s", bundlePath)