Preliminary pruning to make GW deploy fit to generic Gateway model.
diff --git a/api.go b/api.go index 8a628e7..676d6fc 100644 --- a/api.go +++ b/api.go
@@ -1,12 +1,8 @@ package apiGatewayDeploy import ( - "bytes" "encoding/json" - "fmt" - "io/ioutil" "net/http" - "net/url" "strconv" "sync/atomic" "time" @@ -20,14 +16,10 @@ const ( TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT = iota + 1 - TRACKER_ERR_BUNDLE_BAD_CHECKSUM - TRACKER_ERR_DEPLOYMENT_BAD_JSON ) const ( API_ERR_BAD_BLOCK = iota + 1 - API_ERR_BAD_JSON - API_ERR_BAD_CONTENT API_ERR_INTERNAL ) @@ -71,21 +63,12 @@ // sent to client type ApiDeploymentResponse []ApiDeployment -type apiDeploymentResult struct { - ID string `json:"id"` - Status string `json:"status"` - ErrorCode int `json:"errorCode"` - Message string `json:"message"` -} -// received from client -type apiDeploymentResults []apiDeploymentResult const deploymentsEndpoint = "/deployments" func InitAPI() { services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") - services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT") } func writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -275,109 +258,6 @@ w.Write(b) } -func apiSetDeploymentResults(w http.ResponseWriter, r *http.Request) { - - var results apiDeploymentResults - buf, _ := ioutil.ReadAll(r.Body) - err := json.Unmarshal(buf, &results) - if err != nil { - log.Errorf("Resp Handler Json Unmarshal err: ", err) - writeError(w, http.StatusBadRequest, API_ERR_BAD_JSON, "Malformed JSON") - return - } - - // validate the results - // todo: these errors to the client should be standardized - var errs bytes.Buffer - var validResults apiDeploymentResults - for i, result := range results { - valid := true - if result.ID == "" { - errs.WriteString(fmt.Sprintf("Missing id at %d\n", i)) - } - - if result.Status != RESPONSE_STATUS_SUCCESS && result.Status != RESPONSE_STATUS_FAIL { - errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n", - RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i)) - } - - if result.Status == RESPONSE_STATUS_FAIL { - if result.ErrorCode == 0 { - errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i)) - } - if result.Message == "" { - errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i)) - } - } - - if valid { - validResults = append(validResults, result) - } - } - - if errs.Len() > 0 { - writeError(w, http.StatusBadRequest, API_ERR_BAD_CONTENT, errs.String()) - return - } - - if len(validResults) > 0 { - setDeploymentResults(validResults) - } - - w.Write([]byte("OK")) -} - -func addHeaders(req *http.Request) { - var token = services.Config().GetString("apigeesync_bearer_token") - req.Header.Add("Authorization", "Bearer "+token) -} - -func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error { - - retryIn := bundleRetryDelay - maxBackOff := 5 * time.Minute - backOffFunc := createBackoff(retryIn, maxBackOff) - - _, err := url.Parse(apiServerBaseURI.String()) - if err != nil { - log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err) - return err - } - apiPath := fmt.Sprintf("%s/clusters/%s/apids/%s/deployments", apiServerBaseURI.String(), apidClusterID, apidInstanceID) - - resultJSON, err := json.Marshal(validResults) - if err != nil { - log.Errorf("unable to marshal deployment results %v: %v", validResults, err) - return err - } - - for { - log.Debugf("transmitting deployment results to tracker by URL=%s data=%s", apiPath, string(resultJSON)) - req, err := http.NewRequest("PUT", apiPath, bytes.NewReader(resultJSON)) - if err != nil { - log.Errorf("unable to create PUT request", err) - return err - } - req.Header.Add("Content-Type", "application/json") - addHeaders(req) - - resp, err := http.DefaultClient.Do(req) - if err != nil || resp.StatusCode != http.StatusOK { - if err != nil { - log.Errorf("failed to communicate with tracking service: %v", err) - } else { - b, _ := ioutil.ReadAll(resp.Body) - log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b)) - } - resp.Body.Close() - backOffFunc() - continue - } - resp.Body.Close() - return nil - } -} - // call whenever the list of deployments changes func incrementETag() string { e := atomic.AddInt64(&eTag, 1)
diff --git a/api_test.go b/api_test.go index a49b9a5..1a9f81b 100644 --- a/api_test.go +++ b/api_test.go
@@ -221,179 +221,6 @@ }) }) - Context("PUT /deployments", func() { - - It("should return BadRequest for invalid request", func() { - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - deploymentResult := apiDeploymentResults{ - apiDeploymentResult{}, - } - payload, err := json.Marshal(deploymentResult) - Expect(err).ShouldNot(HaveOccurred()) - - req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) - req.Header.Add("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - defer resp.Body.Close() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).Should(Equal(http.StatusBadRequest)) - }) - - It("should ignore deployments that can't be found", func() { - - deploymentID := "api_missing_deployment" - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - deploymentResult := apiDeploymentResults{ - apiDeploymentResult{ - ID: deploymentID, - Status: RESPONSE_STATUS_SUCCESS, - }, - } - payload, err := json.Marshal(deploymentResult) - Expect(err).ShouldNot(HaveOccurred()) - - req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) - req.Header.Add("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - defer resp.Body.Close() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - }) - - It("should mark a deployment as successful", func() { - - db := getDB() - deploymentID := "api_mark_deployed" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - deploymentResult := apiDeploymentResults{ - apiDeploymentResult{ - ID: deploymentID, - Status: RESPONSE_STATUS_SUCCESS, - }, - } - payload, err := json.Marshal(deploymentResult) - Expect(err).ShouldNot(HaveOccurred()) - - req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) - req.Header.Add("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - defer resp.Body.Close() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - - var deployStatus string - err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID). - Scan(&deployStatus) - Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS)) - }) - - It("should mark a deployment as failed", func() { - - db := getDB() - deploymentID := "api_mark_failed" - insertTestDeployment(testServer, deploymentID) - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - deploymentResults := apiDeploymentResults{ - apiDeploymentResult{ - ID: deploymentID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: 100, - Message: "Some error message", - }, - } - payload, err := json.Marshal(deploymentResults) - Expect(err).ShouldNot(HaveOccurred()) - - req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload)) - req.Header.Add("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - defer resp.Body.Close() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - - var deployStatus, deploy_error_message string - var deploy_error_code int - err = db.QueryRow(` - SELECT deploy_status, deploy_error_code, deploy_error_message - FROM edgex_deployment - WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message) - Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL)) - Expect(deploy_error_code).Should(Equal(100)) - Expect(deploy_error_message).Should(Equal("Some error message")) - }) - - It("should communicate status to tracking server", func() { - - deploymentResults := apiDeploymentResults{ - apiDeploymentResult{ - ID: "deploymentID", - Status: RESPONSE_STATUS_FAIL, - ErrorCode: 100, - Message: "Some error message", - }, - } - - err := transmitDeploymentResultsToServer(deploymentResults) - Expect(err).NotTo(HaveOccurred()) - - Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID")) - Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID")) - Expect(testLastTrackerBody).ToNot(BeEmpty()) - - var uploaded apiDeploymentResults - json.Unmarshal(testLastTrackerBody, &uploaded) - - Expect(uploaded).To(Equal(deploymentResults)) - }) - - It("should get iso8601 time", func() { - testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"} - isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"} - for i, t := range testTimes { - log.Debug("insert deployment with timestamp: " + t) - deploymentID := "api_time_iso8601_" + strconv.Itoa(i) - insertTimeDeployment(testServer, deploymentID, t) - } - - uri, err := url.Parse(testServer.URL) - uri.Path = deploymentsEndpoint - - res, err := http.Get(uri.String()) - Expect(err).ShouldNot(HaveOccurred()) - defer res.Body.Close() - - Expect(res.StatusCode).Should(Equal(http.StatusOK)) - - var depRes ApiDeploymentResponse - body, err := ioutil.ReadAll(res.Body) - Expect(err).ShouldNot(HaveOccurred()) - json.Unmarshal(body, &depRes) - - Expect(len(depRes)).To(Equal(len(testTimes))) - - for i, dep := range depRes { - Expect(dep.Created).To(Equal(isoTime[i])) - Expect(dep.Updated).To(Equal(isoTime[i])) - } - }) - }) }) func insertTestDeployment(testServer *httptest.Server, deploymentID string) { @@ -443,49 +270,4 @@ Expect(err).ShouldNot(HaveOccurred()) } -func insertTimeDeployment(testServer *httptest.Server, deploymentID string, timestamp string) { - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundleJson, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - - dep := DataDeployment{ - ID: deploymentID, - BundleConfigID: deploymentID, - ApidClusterID: deploymentID, - DataScopeID: deploymentID, - BundleConfigJSON: string(bundleJson), - ConfigJSON: string(bundleJson), - Created: timestamp, - CreatedBy: "", - Updated: timestamp, - UpdatedBy: "", - BundleName: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - LocalBundleURI: "x", - DeployStatus: "", - DeployErrorCode: 0, - DeployErrorMessage: "", - } - - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -}
diff --git a/bundle.go b/bundle.go index 740947d..5a2c977 100644 --- a/bundle.go +++ b/bundle.go
@@ -46,14 +46,6 @@ if err != nil { msg := fmt.Sprintf("invalid bundle checksum type: %s for deployment: %s", dep.BundleChecksumType, dep.ID) log.Error(msg) - setDeploymentResults(apiDeploymentResults{ - { - ID: dep.ID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: TRACKER_ERR_BUNDLE_BAD_CHECKSUM, - Message: msg, - }, - }) return } @@ -131,14 +123,6 @@ r.markFailedAt = time.Time{} log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", r.dep.ID, r.dep.BundleURI) - setDeploymentResults(apiDeploymentResults{ - { - ID: r.dep.ID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT, - Message: "bundle download failed", - }, - }) } } }
diff --git a/data.go b/data.go index c2b3ecf..b09c0cb 100644 --- a/data.go +++ b/data.go
@@ -2,7 +2,6 @@ import ( "database/sql" - "fmt" "sync" "encoding/json" @@ -315,49 +314,7 @@ return } -func setDeploymentResults(results apiDeploymentResults) error { - // also send results to server - go transmitDeploymentResultsToServer(results) - - log.Debugf("setDeploymentResults: %v", results) - - tx, err := getDB().Begin() - if err != nil { - log.Errorf("Unable to begin transaction: %v", err) - return err - } - defer tx.Rollback() - - stmt, err := tx.Prepare(` - UPDATE edgex_deployment - SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3 - WHERE id=$4; - `) - if err != nil { - log.Errorf("prepare updateDeploymentStatus failed: %v", err) - return err - } - defer stmt.Close() - - for _, result := range results { - res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID) - if err != nil { - log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err) - return err - } - n, err := res.RowsAffected() - if n == 0 || err != nil { - log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID)) - } - } - - err = tx.Commit() - if err != nil { - log.Errorf("Unable to commit setDeploymentResults transaction: %v", err) - } - return err -} func updateLocalBundleURI(depID, localBundleUri string) error {
diff --git a/listener.go b/listener.go index 8a4f017..e9cd113 100644 --- a/listener.go +++ b/listener.go
@@ -5,8 +5,6 @@ "os" "time" - "fmt" - "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) @@ -88,29 +86,6 @@ } func startupOnExistingDatabase() { - // ensure all deployment statuses have been sent to tracker - go func() { - deployments, err := getDeployments("WHERE deploy_status != $1", "") - if err != nil { - log.Panicf("unable to query database for ready deployments: %v", err) - } - log.Debugf("Queuing %d deployments for bundle download", len(deployments)) - - var results apiDeploymentResults - for _, dep := range deployments { - result := apiDeploymentResult{ - ID: dep.ID, - Status: dep.DeployStatus, - ErrorCode: dep.DeployErrorCode, - Message: dep.DeployErrorMessage, - } - results = append(results, result) - } - if len(results) > 0 { - transmitDeploymentResultsToServer(results) - } - }() - // start bundle downloads that didn't finish go func() { deployments, err := getUnreadyDeployments() @@ -128,7 +103,6 @@ // changes have been applied to DB var insertedDeployments, deletedDeployments []DataDeployment - var errResults apiDeploymentResults for _, change := range changes.Changes { switch change.Table { case DEPLOYMENT_TABLE: @@ -137,14 +111,6 @@ dep, err := dataDeploymentFromRow(change.NewRow) if err == nil { insertedDeployments = append(insertedDeployments, dep) - } else { - result := apiDeploymentResult{ - ID: dep.ID, - Status: RESPONSE_STATUS_FAIL, - ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON, - Message: fmt.Sprintf("unable to parse deployment: %v", err), - } - errResults = append(errResults, result) } case common.Delete: var id, dataScopeID string @@ -162,11 +128,6 @@ } } - // transmit parsing errors back immediately - if len(errResults) > 0 { - go transmitDeploymentResultsToServer(errResults) - } - for _, d := range deletedDeployments { deploymentsChanged <- d.ID }