send deployment results to tracker server, change POST /deployments to PUT /deployments
diff --git a/api.go b/api.go
index f4f2803..e677294 100644
--- a/api.go
+++ b/api.go
@@ -9,6 +9,7 @@
"strconv"
"time"
"sync"
+ "net/url"
)
const (
@@ -59,7 +60,7 @@
func InitAPI() {
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
- services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST")
+ services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -279,9 +280,52 @@
return
}
+ if len(validResults) > 0 {
+ go transmitDeploymentResultsToServer(validResults)
+ setDeploymentResults(validResults)
+ }
+
w.Write([]byte("OK"))
+}
- setDeploymentResults(validResults)
+func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error {
- //go transmitDeploymentResultsToServer(validResults)
+ retryIn := bundleRetryDelay
+ maxBackOff := 5 * time.Minute
+ backOffFunc := createBackoff(retryIn, maxBackOff)
+
+ uri, err := url.Parse(apiServerBaseURI.String())
+ if err != nil {
+ log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err)
+ return err
+ }
+ uri.Path = fmt.Sprintf("/clusters/%s/apids/%s/deployments", apidClusterID, apidInstanceID)
+
+ resultJSON, err := json.Marshal(validResults)
+ if err != nil {
+ log.Errorf("unable to marshal deployment results %v: %v", validResults, err)
+ return err
+ }
+
+ for {
+ log.Debugf("transmitting deployment results to tracker: %s", string(resultJSON))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(resultJSON))
+ req.Header.Add("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil || resp.StatusCode != http.StatusOK {
+ if err != nil {
+ log.Errorf("failed to communicate with tracking service: %v", err)
+ } else {
+ b, _ := ioutil.ReadAll(resp.Body)
+ log.Errorf("tracking service call failed. code: %d, body: %s", resp.StatusCode, string(b))
+ }
+ backOffFunc()
+ resp.Body.Close()
+ continue
+ }
+
+ resp.Body.Close()
+ return nil
+ }
}
diff --git a/api_test.go b/api_test.go
index f267303..1f35233 100644
--- a/api_test.go
+++ b/api_test.go
@@ -182,7 +182,7 @@
})
})
- Context("POST /deployments", func() {
+ Context("PUT /deployments", func() {
It("should return BadRequest for invalid request", func() {
@@ -196,7 +196,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -221,7 +221,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -248,7 +248,7 @@
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -271,7 +271,7 @@
uri, err := url.Parse(testServer.URL)
uri.Path = deploymentsEndpoint
- deploymentResult := apiDeploymentResults{
+ deploymentResults := apiDeploymentResults{
apiDeploymentResult{
ID: deploymentID,
Status: RESPONSE_STATUS_FAIL,
@@ -279,10 +279,10 @@
Message: "Some error message",
},
}
- payload, err := json.Marshal(deploymentResult)
+ payload, err := json.Marshal(deploymentResults)
Expect(err).ShouldNot(HaveOccurred())
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
req.Header.Add("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
@@ -300,6 +300,29 @@
Expect(deploy_error_code).Should(Equal(100))
Expect(deploy_error_message).Should(Equal("Some error message"))
})
+
+ It("should communicate status to tracking server", func() {
+ deploymentResults := apiDeploymentResults{
+ apiDeploymentResult{
+ ID: "deploymentID",
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: 100,
+ Message: "Some error message",
+ },
+ }
+
+ err := transmitDeploymentResultsToServer(deploymentResults)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID"))
+ Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID"))
+ Expect(testLastTrackerBody).ToNot(BeEmpty())
+
+ var uploaded apiDeploymentResults
+ json.Unmarshal(testLastTrackerBody, &uploaded)
+
+ Expect(uploaded).To(Equal(deploymentResults))
+ })
})
})
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml
index 916c081..c26d47f 100644
--- a/apidGatewayDeploy-api.yaml
+++ b/apidGatewayDeploy-api.yaml
@@ -118,7 +118,7 @@
$ref: '#/definitions/DeploymentResponse'
'304':
description: Deployment not modified.
- post:
+ put:
description: Save results of deployment
parameters:
- name: _
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 33d3cf3..061879d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -17,8 +17,10 @@
)
var (
- tmpDir string
- testServer *httptest.Server
+ tmpDir string
+ testServer *httptest.Server
+ testLastTrackerVars map[string]string
+ testLastTrackerBody []byte
)
var _ = BeforeSuite(func() {
@@ -31,6 +33,9 @@
Expect(err).NotTo(HaveOccurred())
config.Set("local_storage_path", tmpDir)
+ config.Set(configApidInstanceID, "INSTANCE_ID")
+ config.Set(configApidClusterID, "CLUSTER_ID")
+ config.Set(configApiServerBaseURI, "http://localhost")
apid.InitializePlugins()
@@ -59,8 +64,29 @@
time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond))
}
w.Write([]byte("/bundles/" + vars["id"]))
- })
+
+ }).Methods("GET")
+
+ // fake an unreliable APID tracker
+ router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
+ func(w http.ResponseWriter, req *http.Request) {
+ count++
+ if count % 2 == 0 {
+ w.WriteHeader(500)
+ return
+ }
+
+ testLastTrackerVars = apid.API().Vars(req)
+ testLastTrackerBody, err = ioutil.ReadAll(req.Body)
+ Expect(err).ToNot(HaveOccurred())
+
+ w.Write([]byte("OK"))
+
+ }).Methods("PUT")
testServer = httptest.NewServer(router)
+
+ apiServerBaseURI, err = url.Parse(testServer.URL)
+ Expect(err).NotTo(HaveOccurred())
})
var _ = AfterSuite(func() {
diff --git a/bundle.go b/bundle.go
index 4616476..44f9a9c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,25 +1,37 @@
package apiGatewayDeploy
import (
+ "crypto/md5"
+ "encoding/base64"
+ "encoding/hex"
+ "errors"
"fmt"
+ "hash"
+ "hash/crc32"
"io"
+ "io/ioutil"
"net/http"
"net/url"
"os"
"path"
"time"
- "encoding/base64"
- "io/ioutil"
- "hash/crc32"
- "errors"
- "crypto/md5"
- "hash"
- "encoding/hex"
)
var bundleRetryDelay time.Duration = time.Second
var bundleDownloadTimeout time.Duration = 10 * time.Minute
+// simple doubling back-off
+func createBackoff(retryIn, maxBackOff time.Duration) func() {
+ return func() {
+ log.Debugf("backoff called. will retry in %s.", retryIn)
+ time.Sleep(retryIn)
+ retryIn = retryIn * time.Duration(2)
+ if retryIn > maxBackOff {
+ retryIn = maxBackOff
+ }
+ }
+}
+
func downloadBundle(dep DataDeployment) {
log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
@@ -39,22 +51,14 @@
return
}
- // simple doubling back-off
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
- backOff := func() {
- log.Debugf("will retry failed download in %s: %v", retryIn, err)
- time.Sleep(retryIn)
- retryIn = retryIn * time.Duration(2)
- if retryIn > maxBackOff {
- retryIn = maxBackOff
- }
- }
+ backOffFunc := createBackoff(retryIn, maxBackOff)
// timeout and mark deployment failed
timeout := time.NewTimer(bundleDownloadTimeout)
go func() {
- <- timeout.C
+ <-timeout.C
log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
var errMessage string
if err != nil {
@@ -98,14 +102,14 @@
break
}
- backOff()
+ backOffFunc()
hashWriter.Reset()
}
log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
// send deployments to client
- deploymentsChanged<- dep.ID
+ deploymentsChanged <- dep.ID
}
func getBundleFile(dep DataDeployment) string {
diff --git a/init.go b/init.go
index 4d32964..49d4912 100644
--- a/init.go
+++ b/init.go
@@ -1,7 +1,9 @@
package apiGatewayDeploy
import (
+ "fmt"
"github.com/30x/apid"
+ "net/url"
"os"
"path"
"time"
@@ -12,6 +14,9 @@
configDebounceDuration = "gatewaydeploy_debounce_duration"
configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout"
+ configApiServerBaseURI = "apigeesync_proxy_server_base"
+ configApidInstanceID = "apigeesync_apid_instance_id"
+ configApidClusterID = "apigeesync_cluster_id"
)
var (
@@ -21,6 +26,9 @@
bundlePath string
debounceDuration time.Duration
bundleCleanupDelay time.Duration
+ apiServerBaseURI *url.URL
+ apidInstanceID string
+ apidClusterID string
)
func init() {
@@ -33,24 +41,44 @@
log.Debug("start init")
config := services.Config()
+
+ if !config.IsSet(configApiServerBaseURI) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI)
+ }
+ var err error
+ apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI))
+ if err != nil {
+ return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
+ }
+
+ if !config.IsSet(configApidInstanceID) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
+ }
+ apidInstanceID = config.GetString(configApidInstanceID)
+
+ if !config.IsSet(configApidClusterID) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
+ }
+ apidClusterID = config.GetString(configApidClusterID)
+
config.SetDefault(configBundleDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
config.SetDefault(configBundleCleanupDelay, time.Minute)
- config.SetDefault(configBundleDownloadTimeout, 5 * time.Minute)
+ config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
debounceDuration = config.GetDuration(configDebounceDuration)
if debounceDuration < time.Millisecond {
- log.Panicf("%s must be a positive duration", configDebounceDuration)
+ return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
}
bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay)
if bundleCleanupDelay < time.Millisecond {
- log.Panicf("%s must be a positive duration", configBundleCleanupDelay)
+ return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
}
bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout)
if bundleDownloadTimeout < time.Millisecond {
- log.Panicf("%s must be a positive duration", configBundleDownloadTimeout)
+ return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout)
}
data = services.Data()
@@ -59,7 +87,7 @@
storagePath := config.GetString("local_storage_path")
bundlePath = path.Join(storagePath, relativeBundlePath)
if err := os.MkdirAll(bundlePath, 0700); err != nil {
- log.Panicf("Failed bundle directory creation: %v", err)
+ return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err)
}
log.Infof("Bundle directory path is %s", bundlePath)