Preliminary pruning to make GW deploy fit to generic Gateway model.
diff --git a/api.go b/api.go
index 8a628e7..676d6fc 100644
--- a/api.go
+++ b/api.go
@@ -1,12 +1,8 @@
package apiGatewayDeploy
import (
- "bytes"
"encoding/json"
- "fmt"
- "io/ioutil"
"net/http"
- "net/url"
"strconv"
"sync/atomic"
"time"
@@ -20,14 +16,10 @@
const (
TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT = iota + 1
- TRACKER_ERR_BUNDLE_BAD_CHECKSUM
- TRACKER_ERR_DEPLOYMENT_BAD_JSON
)
const (
API_ERR_BAD_BLOCK = iota + 1
- API_ERR_BAD_JSON
- API_ERR_BAD_CONTENT
API_ERR_INTERNAL
)
@@ -71,21 +63,12 @@
// sent to client
type ApiDeploymentResponse []ApiDeployment
-type apiDeploymentResult struct {
- ID string `json:"id"`
- Status string `json:"status"`
- ErrorCode int `json:"errorCode"`
- Message string `json:"message"`
-}
-// received from client
-type apiDeploymentResults []apiDeploymentResult
const deploymentsEndpoint = "/deployments"
func InitAPI() {
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
- services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -275,109 +258,6 @@
w.Write(b)
}
-func apiSetDeploymentResults(w http.ResponseWriter, r *http.Request) {
-
- var results apiDeploymentResults
- buf, _ := ioutil.ReadAll(r.Body)
- err := json.Unmarshal(buf, &results)
- if err != nil {
- log.Errorf("Resp Handler Json Unmarshal err: ", err)
- writeError(w, http.StatusBadRequest, API_ERR_BAD_JSON, "Malformed JSON")
- return
- }
-
- // validate the results
- // todo: these errors to the client should be standardized
- var errs bytes.Buffer
- var validResults apiDeploymentResults
- for i, result := range results {
- valid := true
- if result.ID == "" {
- errs.WriteString(fmt.Sprintf("Missing id at %d\n", i))
- }
-
- if result.Status != RESPONSE_STATUS_SUCCESS && result.Status != RESPONSE_STATUS_FAIL {
- errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n",
- RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
- }
-
- if result.Status == RESPONSE_STATUS_FAIL {
- if result.ErrorCode == 0 {
- errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i))
- }
- if result.Message == "" {
- errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i))
- }
- }
-
- if valid {
- validResults = append(validResults, result)
- }
- }
-
- if errs.Len() > 0 {
- writeError(w, http.StatusBadRequest, API_ERR_BAD_CONTENT, errs.String())
- return
- }
-
- if len(validResults) > 0 {
- setDeploymentResults(validResults)
- }
-
- w.Write([]byte("OK"))
-}
-
-func addHeaders(req *http.Request) {
- var token = services.Config().GetString("apigeesync_bearer_token")
- req.Header.Add("Authorization", "Bearer "+token)
-}
-
-func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error {
-
- retryIn := bundleRetryDelay
- maxBackOff := 5 * time.Minute
- backOffFunc := createBackoff(retryIn, maxBackOff)
-
- _, err := url.Parse(apiServerBaseURI.String())
- if err != nil {
- log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err)
- return err
- }
- apiPath := fmt.Sprintf("%s/clusters/%s/apids/%s/deployments", apiServerBaseURI.String(), apidClusterID, apidInstanceID)
-
- resultJSON, err := json.Marshal(validResults)
- if err != nil {
- log.Errorf("unable to marshal deployment results %v: %v", validResults, err)
- return err
- }
-
- for {
- log.Debugf("transmitting deployment results to tracker by URL=%s data=%s", apiPath, string(resultJSON))
- req, err := http.NewRequest("PUT", apiPath, bytes.NewReader(resultJSON))
- if err != nil {
- log.Errorf("unable to create PUT request", err)
- return err
- }
- req.Header.Add("Content-Type", "application/json")
- addHeaders(req)
-
- resp, err := http.DefaultClient.Do(req)
- if err != nil || resp.StatusCode != http.StatusOK {
- if err != nil {
- log.Errorf("failed to communicate with tracking service: %v", err)
- } else {
- b, _ := ioutil.ReadAll(resp.Body)
- log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b))
- }
- resp.Body.Close()
- backOffFunc()
- continue
- }
- resp.Body.Close()
- return nil
- }
-}
-
// call whenever the list of deployments changes
func incrementETag() string {
e := atomic.AddInt64(&eTag, 1)
diff --git a/api_test.go b/api_test.go
index a49b9a5..1a9f81b 100644
--- a/api_test.go
+++ b/api_test.go
@@ -221,179 +221,6 @@
})
})
- Context("PUT /deployments", func() {
-
- It("should return BadRequest for invalid request", func() {
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- deploymentResult := apiDeploymentResults{
- apiDeploymentResult{},
- }
- payload, err := json.Marshal(deploymentResult)
- Expect(err).ShouldNot(HaveOccurred())
-
- req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
-
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusBadRequest))
- })
-
- It("should ignore deployments that can't be found", func() {
-
- deploymentID := "api_missing_deployment"
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- deploymentResult := apiDeploymentResults{
- apiDeploymentResult{
- ID: deploymentID,
- Status: RESPONSE_STATUS_SUCCESS,
- },
- }
- payload, err := json.Marshal(deploymentResult)
- Expect(err).ShouldNot(HaveOccurred())
-
- req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
-
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusOK))
- })
-
- It("should mark a deployment as successful", func() {
-
- db := getDB()
- deploymentID := "api_mark_deployed"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- deploymentResult := apiDeploymentResults{
- apiDeploymentResult{
- ID: deploymentID,
- Status: RESPONSE_STATUS_SUCCESS,
- },
- }
- payload, err := json.Marshal(deploymentResult)
- Expect(err).ShouldNot(HaveOccurred())
-
- req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
-
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusOK))
-
- var deployStatus string
- err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID).
- Scan(&deployStatus)
- Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS))
- })
-
- It("should mark a deployment as failed", func() {
-
- db := getDB()
- deploymentID := "api_mark_failed"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- deploymentResults := apiDeploymentResults{
- apiDeploymentResult{
- ID: deploymentID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: 100,
- Message: "Some error message",
- },
- }
- payload, err := json.Marshal(deploymentResults)
- Expect(err).ShouldNot(HaveOccurred())
-
- req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
-
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusOK))
-
- var deployStatus, deploy_error_message string
- var deploy_error_code int
- err = db.QueryRow(`
- SELECT deploy_status, deploy_error_code, deploy_error_message
- FROM edgex_deployment
- WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message)
- Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL))
- Expect(deploy_error_code).Should(Equal(100))
- Expect(deploy_error_message).Should(Equal("Some error message"))
- })
-
- It("should communicate status to tracking server", func() {
-
- deploymentResults := apiDeploymentResults{
- apiDeploymentResult{
- ID: "deploymentID",
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: 100,
- Message: "Some error message",
- },
- }
-
- err := transmitDeploymentResultsToServer(deploymentResults)
- Expect(err).NotTo(HaveOccurred())
-
- Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID"))
- Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID"))
- Expect(testLastTrackerBody).ToNot(BeEmpty())
-
- var uploaded apiDeploymentResults
- json.Unmarshal(testLastTrackerBody, &uploaded)
-
- Expect(uploaded).To(Equal(deploymentResults))
- })
-
- It("should get iso8601 time", func() {
- testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"}
- isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"}
- for i, t := range testTimes {
- log.Debug("insert deployment with timestamp: " + t)
- deploymentID := "api_time_iso8601_" + strconv.Itoa(i)
- insertTimeDeployment(testServer, deploymentID, t)
- }
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = deploymentsEndpoint
-
- res, err := http.Get(uri.String())
- Expect(err).ShouldNot(HaveOccurred())
- defer res.Body.Close()
-
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
- var depRes ApiDeploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
-
- Expect(len(depRes)).To(Equal(len(testTimes)))
-
- for i, dep := range depRes {
- Expect(dep.Created).To(Equal(isoTime[i]))
- Expect(dep.Updated).To(Equal(isoTime[i]))
- }
- })
- })
})
func insertTestDeployment(testServer *httptest.Server, deploymentID string) {
@@ -443,49 +270,4 @@
Expect(err).ShouldNot(HaveOccurred())
}
-func insertTimeDeployment(testServer *httptest.Server, deploymentID string, timestamp string) {
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundleJson, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
-
- dep := DataDeployment{
- ID: deploymentID,
- BundleConfigID: deploymentID,
- ApidClusterID: deploymentID,
- DataScopeID: deploymentID,
- BundleConfigJSON: string(bundleJson),
- ConfigJSON: string(bundleJson),
- Created: timestamp,
- CreatedBy: "",
- Updated: timestamp,
- UpdatedBy: "",
- BundleName: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- LocalBundleURI: "x",
- DeployStatus: "",
- DeployErrorCode: 0,
- DeployErrorMessage: "",
- }
-
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
diff --git a/bundle.go b/bundle.go
index 740947d..5a2c977 100644
--- a/bundle.go
+++ b/bundle.go
@@ -46,14 +46,6 @@
if err != nil {
msg := fmt.Sprintf("invalid bundle checksum type: %s for deployment: %s", dep.BundleChecksumType, dep.ID)
log.Error(msg)
- setDeploymentResults(apiDeploymentResults{
- {
- ID: dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: TRACKER_ERR_BUNDLE_BAD_CHECKSUM,
- Message: msg,
- },
- })
return
}
@@ -131,14 +123,6 @@
r.markFailedAt = time.Time{}
log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s",
r.dep.ID, r.dep.BundleURI)
- setDeploymentResults(apiDeploymentResults{
- {
- ID: r.dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT,
- Message: "bundle download failed",
- },
- })
}
}
}
diff --git a/data.go b/data.go
index c2b3ecf..b09c0cb 100644
--- a/data.go
+++ b/data.go
@@ -2,7 +2,6 @@
import (
"database/sql"
- "fmt"
"sync"
"encoding/json"
@@ -315,49 +314,7 @@
return
}
-func setDeploymentResults(results apiDeploymentResults) error {
- // also send results to server
- go transmitDeploymentResultsToServer(results)
-
- log.Debugf("setDeploymentResults: %v", results)
-
- tx, err := getDB().Begin()
- if err != nil {
- log.Errorf("Unable to begin transaction: %v", err)
- return err
- }
- defer tx.Rollback()
-
- stmt, err := tx.Prepare(`
- UPDATE edgex_deployment
- SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
- WHERE id=$4;
- `)
- if err != nil {
- log.Errorf("prepare updateDeploymentStatus failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- for _, result := range results {
- res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
- if err != nil {
- log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err)
- return err
- }
- n, err := res.RowsAffected()
- if n == 0 || err != nil {
- log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID))
- }
- }
-
- err = tx.Commit()
- if err != nil {
- log.Errorf("Unable to commit setDeploymentResults transaction: %v", err)
- }
- return err
-}
func updateLocalBundleURI(depID, localBundleUri string) error {
diff --git a/listener.go b/listener.go
index 8a4f017..e9cd113 100644
--- a/listener.go
+++ b/listener.go
@@ -5,8 +5,6 @@
"os"
"time"
- "fmt"
-
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
@@ -88,29 +86,6 @@
}
func startupOnExistingDatabase() {
- // ensure all deployment statuses have been sent to tracker
- go func() {
- deployments, err := getDeployments("WHERE deploy_status != $1", "")
- if err != nil {
- log.Panicf("unable to query database for ready deployments: %v", err)
- }
- log.Debugf("Queuing %d deployments for bundle download", len(deployments))
-
- var results apiDeploymentResults
- for _, dep := range deployments {
- result := apiDeploymentResult{
- ID: dep.ID,
- Status: dep.DeployStatus,
- ErrorCode: dep.DeployErrorCode,
- Message: dep.DeployErrorMessage,
- }
- results = append(results, result)
- }
- if len(results) > 0 {
- transmitDeploymentResultsToServer(results)
- }
- }()
-
// start bundle downloads that didn't finish
go func() {
deployments, err := getUnreadyDeployments()
@@ -128,7 +103,6 @@
// changes have been applied to DB
var insertedDeployments, deletedDeployments []DataDeployment
- var errResults apiDeploymentResults
for _, change := range changes.Changes {
switch change.Table {
case DEPLOYMENT_TABLE:
@@ -137,14 +111,6 @@
dep, err := dataDeploymentFromRow(change.NewRow)
if err == nil {
insertedDeployments = append(insertedDeployments, dep)
- } else {
- result := apiDeploymentResult{
- ID: dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON,
- Message: fmt.Sprintf("unable to parse deployment: %v", err),
- }
- errResults = append(errResults, result)
}
case common.Delete:
var id, dataScopeID string
@@ -162,11 +128,6 @@
}
}
- // transmit parsing errors back immediately
- if len(errResults) > 0 {
- go transmitDeploymentResultsToServer(errResults)
- }
-
for _, d := range deletedDeployments {
deploymentsChanged <- d.ID
}