rewritten for new deployment schema and API
diff --git a/api.go b/api.go index 09a4d17..e6fd817 100644 --- a/api.go +++ b/api.go
@@ -1,26 +1,26 @@ package apiGatewayDeploy import ( - "database/sql" + "bytes" "encoding/json" - "github.com/30x/apid" + "fmt" "io/ioutil" "net/http" "strconv" "time" - "fmt" ) const ( RESPONSE_STATUS_SUCCESS = "SUCCESS" RESPONSE_STATUS_FAIL = "FAIL" - ERROR_CODE_TODO = 0 // todo: add error codes where this is used + // todo: add error codes where this is used + ERROR_CODE_TODO = 0 ) var ( - incoming = make(chan string) - addSubscriber = make(chan chan string) + deploymentsChanged = make(chan string) + addSubscriber = make(chan chan string) ) type errorResponse struct { @@ -28,9 +28,36 @@ Reason string `json:"reason"` } +type apiDeployment struct { + ID string `json:"id"` + ScopeId string `json:"scopeId"` + Created string `json:"created"` + CreatedBy string `json:"createdBy"` + Updated string `json:"updated"` + UpdatedBy string `json:"updatedBy"` + ConfigurationJson string `json:"configurationJson"` + DisplayName string `json:"displayName"` + URI string `json:"uri"` +} + +// sent to client +type apiDeploymentResponse []apiDeployment + +type apiDeploymentResult struct { + ID string `json:"id"` + Status string `json:"status"` + ErrorCode int `json:"errorCode"` + Message string `json:"message"` +} + +// received from client +type apiDeploymentResults []apiDeploymentResult + +const deploymentsEndpoint = "/deployments" + func initAPI() { - services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET") - services.API().HandleFunc("/deployments/{deploymentID}", handleDeploymentResult).Methods("POST") + services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST") } func writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -56,10 +83,13 @@ subscribers := make(map[chan string]struct{}) for { select { - case msg := <-incoming: - log.Debugf("Delivering new deployment %s to %d subscribers", msg, len(subscribers)) - for subscriber := range subscribers { - delete(subscribers, subscriber) // all subscriptions are one-time notify + case msg := <-deploymentsChanged: + // todo: add a debounce w/ timeout to avoid sending on every single deployment? + subs := subscribers + incrementETag() // todo: do this elsewhere? check error? + subscribers = make(map[chan string]struct{}) + log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs)) + for subscriber := range subs { select { case subscriber <- msg: log.Debugf("Handling deploy response for: %s", msg) @@ -74,7 +104,7 @@ } } -func handleCurrentDeployment(w http.ResponseWriter, r *http.Request) { +func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { // If returning without a bundle (immediately or after timeout), status = 404 // If returning If-None-Match value is equal to current deployment, status = 304 @@ -97,54 +127,54 @@ // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block' // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the // most recent bundle list. - priorDepID := r.Header.Get("If-None-Match") - log.Debugf("if-none-match: %s", priorDepID) + ifNoneMatch := r.Header.Get("If-None-Match") + log.Debugf("if-none-match: %s", ifNoneMatch) - // subscribe to new deployments in case we need it - var newReq chan string - if timeout > 0 && priorDepID != "" { - newReq = make(chan string) - addSubscriber <- newReq + // send unmodified if matches prior eTag and no timeout + eTag, err := getETag() + if err != nil { + writeDatabaseError(w) + return + } + if eTag == ifNoneMatch && timeout == 0 { + w.WriteHeader(http.StatusNotModified) + return } - depID, err := getCurrentDeploymentID() - if err != nil && err != sql.ErrNoRows{ + // subscribe to new deployments in case we need it + var gotNewDeployment chan string + if timeout > 0 && ifNoneMatch != "" { + gotNewDeployment = make(chan string) + addSubscriber <- gotNewDeployment + } + + deployments, err := getReadyDeployments() + if err != nil { writeDatabaseError(w) return } - // not found, no timeout, send immediately - if depID == "" && timeout == 0 { + // send not found if no timeout + if len(deployments) == 0 && timeout == 0 { w.WriteHeader(http.StatusNotFound) return } - // found, send immediately - if doesn't match prior ID - if depID != "" { - if depID == priorDepID { - if timeout == 0 { - w.WriteHeader(http.StatusNotModified) - return - } else { - // continue - } - } else { - sendDeployment(w, depID) - return - } + // send results if different eTag + if eTag != ifNoneMatch { + sendDeployments(w, deployments, eTag) + return } - // can't send immediately, wait for the subscription log.Debug("Blocking request... Waiting for new Deployments.") - // block until new deployment or timeout select { - case depID := <-newReq: - sendDeployment(w, depID) + case <-gotNewDeployment: + apiGetCurrentDeployments(w, r) // recurse case <-time.After(time.Duration(timeout) * time.Second): log.Debug("Blocking deployment request timed out.") - if priorDepID != "" { + if ifNoneMatch != "" { w.WriteHeader(http.StatusNotModified) } else { w.WriteHeader(http.StatusNotFound) @@ -152,62 +182,79 @@ } } -func sendDeployment(w http.ResponseWriter, depID string) { - deployment, err := getDeployment(depID) - if err != nil { - log.Errorf("unable to retrieve deployment [%s]: %s", depID, err) - w.WriteHeader(http.StatusInternalServerError) +func sendDeployments(w http.ResponseWriter, dataDeps []dataDeployment, eTag string) { + + var apiDeps apiDeploymentResponse + + for _, d := range dataDeps { + apiDeps = append(apiDeps, apiDeployment{ + ID: d.ID, + ScopeId: d.DataScopeID, + Created: d.Created, + CreatedBy: d.CreatedBy, + Updated: d.Updated, + UpdatedBy: d.UpdatedBy, + ConfigurationJson: d.ConfigJSON, + DisplayName: d.BundleName, + URI: d.LocalBundleURI, + }) } - b, err := json.Marshal(deployment) + + b, err := json.Marshal(apiDeps) if err != nil { - log.Errorf("unable to marshal deployment: %s", err) + log.Errorf("unable to marshal deployments: %s", err) w.WriteHeader(http.StatusInternalServerError) - } else { - log.Debugf("sending deployment %s: %s", depID, b) - w.Header().Set("ETag", depID) - w.Write(b) - } -} - -// todo: we'll need to transmit results back to Edge somehow - TBD -func handleDeploymentResult(w http.ResponseWriter, r *http.Request) { - - depID := apid.API().Vars(r)["deploymentID"] - - if depID == "" { - log.Error("No deployment ID") - writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Missing deployment ID") return } - var rsp deploymentResponse + log.Debugf("sending deployment %s: %s", eTag, b) + w.Header().Set("ETag", eTag) + w.Write(b) +} + +func apiSetDeploymentResults(w http.ResponseWriter, r *http.Request) { + + var results apiDeploymentResults buf, _ := ioutil.ReadAll(r.Body) - err := json.Unmarshal(buf, &rsp) + err := json.Unmarshal(buf, &results) if err != nil { - log.Error("Resp Handler Json Unmarshal err: ", err) + log.Errorf("Resp Handler Json Unmarshal err: ", err) writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Malformed JSON") return } - if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL { - writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, - fmt.Sprintf("status must be '%s' or '%s'", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL)) - return - } + // validate the results + // todo: these errors to the client should be standardized + var errs bytes.Buffer + for i, rsp := range results { + if rsp.ID == "" { + errs.WriteString(fmt.Sprintf("Missing id at %d\n", i)) + } - if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.Error.ErrorCode == 0 || rsp.Error.Reason == "") { - writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "errorCode and reason are required") - return - } + if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL { + errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i)) + } - err = updateDeploymentAndBundles(depID, rsp) - if err != nil { - if err == sql.ErrNoRows { - writeError(w, http.StatusNotFound, ERROR_CODE_TODO, "not found") - } else { - writeDatabaseError(w) + if rsp.Status == RESPONSE_STATUS_FAIL { + if rsp.ErrorCode == 0 { + errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i)) + } + if rsp.Message == "" { + errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i)) + } } } + if errs.Len() > 0 { + writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, errs.String()) + } + + err = setDeploymentResults(results) + if err != nil { + writeDatabaseError(w) + } + + // todo: transmit to server (API TBD) + //err = transmitDeploymentResultsToServer() return }
diff --git a/api_test.go b/api_test.go index 669f3e0..8bc93e6 100644 --- a/api_test.go +++ b/api_test.go
@@ -1,30 +1,30 @@ package apiGatewayDeploy import ( - "bytes" - "encoding/json" - "fmt" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "io/ioutil" "net/http" "net/http/httptest" "net/url" + "encoding/json" + "io/ioutil" "time" + "bytes" ) var _ = Describe("api", func() { - Context("GET /deployments/current", func() { + BeforeEach(func() { + _, err := getDB().Exec("DELETE FROM deployments") + Expect(err).ShouldNot(HaveOccurred()) + }) - It("should get 404 if no deployments", func() { + Context("GET /deployments", func() { - db := getDB() - _, err := db.Exec("DELETE FROM gateway_deploy_deployment") - Expect(err).ShouldNot(HaveOccurred()) + It("should get an empty array if no deployments", func() { uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) @@ -32,27 +32,33 @@ Expect(res.StatusCode).Should(Equal(http.StatusNotFound)) }) - It("should get current deployment", func() { + It("should get current deployments", func() { deploymentID := "api_get_current" insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() - var depRes deployment + var depRes apiDeploymentResponse body, err := ioutil.ReadAll(res.Body) Expect(err).ShouldNot(HaveOccurred()) json.Unmarshal(body, &depRes) - Expect(depRes.DeploymentID).Should(Equal(deploymentID)) - Expect(depRes.Bundles[0].Scope).Should(Equal("some-scope")) + Expect(len(depRes)).To(Equal(1)) - Expect(res.Header.Get("etag")).Should(Equal(deploymentID)) + dep := depRes[0] + + Expect(dep.ID).To(Equal(deploymentID)) + Expect(dep.ScopeId).To(Equal(deploymentID)) + Expect(dep.DisplayName).To(Equal(deploymentID)) + + // todo: more tests + //dep.ConfigurationJson }) It("should get 304 for no change", func() { @@ -61,7 +67,7 @@ insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() @@ -76,14 +82,10 @@ Expect(res.StatusCode).To(Equal(http.StatusNotModified)) }) - It("should get 404 after blocking if no deployment", func() { - - db := getDB() - _, err := db.Exec("DELETE FROM gateway_deploy_deployment") - Expect(err).ShouldNot(HaveOccurred()) + It("should get empty set after blocking if no deployments", func() { uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint query := uri.Query() query.Add("block", "1") @@ -91,7 +93,9 @@ res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() - Expect(res.StatusCode).Should(Equal(http.StatusNotFound)) + + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + }) It("should get new deployment after blocking", func(done Done) { @@ -99,7 +103,7 @@ deploymentID := "api_get_current_blocking" insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() @@ -120,27 +124,25 @@ defer res.Body.Close() Expect(res.StatusCode).To(Equal(http.StatusOK)) - var depRes deployment + var depRes apiDeploymentResponse body, err := ioutil.ReadAll(res.Body) Expect(err).ShouldNot(HaveOccurred()) json.Unmarshal(body, &depRes) - Expect(depRes.DeploymentID).Should(Equal(deploymentID)) + Expect(len(depRes)).To(Equal(2)) - for _, bundle := range depRes.Bundles { - uri, err := url.Parse(bundle.URI) - Expect(err).ShouldNot(HaveOccurred()) - bContent, err := ioutil.ReadFile(uri.Path) - Expect(err).ShouldNot(HaveOccurred()) - content := string(bContent) - Expect(content).Should(HavePrefix("/bundle/")) - } + dep := depRes[1] + + Expect(dep.ID).To(Equal(deploymentID)) + Expect(dep.ScopeId).To(Equal(deploymentID)) + Expect(dep.DisplayName).To(Equal(deploymentID)) close(done) }() - time.Sleep(25 * time.Millisecond) // give api call above time to block - triggerDeploymentEvent(deploymentID) + time.Sleep(250 * time.Millisecond) // give api call above time to block + insertTestDeployment(testServer, deploymentID) + deploymentsChanged<- deploymentID }) It("should get 304 after blocking if no new deployment", func() { @@ -148,7 +150,7 @@ deploymentID := "api_no_change_blocking" insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = "/deployments/current" + uri.Path = deploymentsEndpoint res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() @@ -167,19 +169,17 @@ }) }) - Context("POST /deployments/{ID}", func() { + Context("POST /deployments", func() { - It("should return a 404 for missing deployment", func() { - - deploymentID := "api_missing_deployment" + It("should return BadRequest for invalid request", func() { uri, err := url.Parse(testServer.URL) - uri.Path = fmt.Sprintf("/deployments/%s", deploymentID) + uri.Path = deploymentsEndpoint - deploymentResult := deploymentResponse{ - Status: RESPONSE_STATUS_SUCCESS, + deploymentResult := apiDeploymentResults{ + apiDeploymentResult{ + }, } - payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) @@ -189,22 +189,49 @@ resp, err := http.DefaultClient.Do(req) defer resp.Body.Close() Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).Should(Equal(http.StatusNotFound)) + Expect(resp.StatusCode).Should(Equal(http.StatusBadRequest)) }) - It("should mark a deployment as deployed", func() { + It("should ignore deployments that can't be found", func() { + + deploymentID := "api_missing_deployment" + + uri, err := url.Parse(testServer.URL) + uri.Path = deploymentsEndpoint + + deploymentResult := apiDeploymentResults{ + apiDeploymentResult{ + ID: deploymentID, + Status: RESPONSE_STATUS_SUCCESS, + }, + } + payload, err := json.Marshal(deploymentResult) + Expect(err).ShouldNot(HaveOccurred()) + + req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload)) + req.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + defer resp.Body.Close() + Expect(err).ShouldNot(HaveOccurred()) + Expect(resp.StatusCode).Should(Equal(http.StatusOK)) + }) + + It("should mark a deployment as successful", func() { db := getDB() deploymentID := "api_mark_deployed" insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = fmt.Sprintf("/deployments/%s", deploymentID) + uri.Path = deploymentsEndpoint - deploymentResult := deploymentResponse{ - Status: RESPONSE_STATUS_SUCCESS, + deploymentResult := apiDeploymentResults{ + apiDeploymentResult{ + ID: deploymentID, + Status: RESPONSE_STATUS_SUCCESS, + }, } - payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) @@ -216,44 +243,29 @@ Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - var deployStatus int - err = db.QueryRow("SELECT status FROM gateway_deploy_deployment WHERE id=?", deploymentID). + var deployStatus string + err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID). Scan(&deployStatus) - Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS)) - - rows, err := db.Query("SELECT status from gateway_deploy_bundle WHERE id = ?;", deploymentID) - Expect(err).ShouldNot(HaveOccurred()) - defer rows.Close() - for rows.Next() { - rows.Scan(&deployStatus) - Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS)) - } + Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS)) }) It("should mark a deployment as failed", func() { db := getDB() - deploymentID := "api_test_3" + deploymentID := "api_mark_failed" insertTestDeployment(testServer, deploymentID) uri, err := url.Parse(testServer.URL) - uri.Path = fmt.Sprintf("/deployments/%s", deploymentID) + uri.Path = deploymentsEndpoint - deploymentResult := deploymentResponse{ - Status: RESPONSE_STATUS_FAIL, - Error: deploymentErrorResponse{ + deploymentResult := apiDeploymentResults{ + apiDeploymentResult{ + ID: deploymentID, + Status: RESPONSE_STATUS_FAIL, ErrorCode: 100, - Reason: "bad juju", - //ErrorDetails: []deploymentErrorDetail{ // todo: add tests for bundle errors - // { - // BundleId: "", - // ErrorCode: 100, - // Reason: "Zombies", - // }, - //}, + Message: "Some error message", }, } - payload, err := json.Marshal(deploymentResult) Expect(err).ShouldNot(HaveOccurred()) @@ -265,41 +277,60 @@ Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).Should(Equal(http.StatusOK)) - var deployStatus int - err = db.QueryRow("SELECT status from gateway_deploy_deployment WHERE id = ?;", - deploymentID).Scan(&deployStatus) - Expect(err).ShouldNot(HaveOccurred()) - Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_ERR_GWY)) + var deployStatus, deploy_error_message string + var deploy_error_code int + err = db.QueryRow(` + SELECT deploy_status, deploy_error_code, deploy_error_message + FROM deployments + WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message) + Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL)) + Expect(deploy_error_code).Should(Equal(100)) + Expect(deploy_error_message).Should(Equal("Some error message")) }) }) }) -func insertTestDeployment(server *httptest.Server, depID string) { +func insertTestDeployment(testServer *httptest.Server, deploymentID string) { - db := getDB() - uri, err := url.Parse(server.URL) + uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundle" - bundleUri := uri.String() - dep := deployment{ - System: bundle{ - URI: bundleUri, - }, - Bundles: []bundle{ - { - BundleID: "bun", - URI: bundleUri, - Scope: "some-scope", - Org: "org", - Env: "env", - }, - }, + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundleJson, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + + dep := dataDeployment{ + ID: deploymentID, + BundleConfigID: deploymentID, + ApidClusterID: deploymentID, + DataScopeID: deploymentID, + BundleConfigJSON: string(bundleJson), + ConfigJSON: "", + Status: "", + Created: "", + CreatedBy: "", + Updated: "", + UpdatedBy: "", + BundleName: deploymentID, + BundleURI: "", + BundleChecksum: "", + BundleChecksumType: "", + LocalBundleURI: "x", } - err = insertDeployment(db, depID, dep) + err = insertDeployment(tx, dep) Expect(err).ShouldNot(HaveOccurred()) - err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0) + err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) }
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml index c48ca40..916c081 100644 --- a/apidGatewayDeploy-api.yaml +++ b/apidGatewayDeploy-api.yaml
@@ -17,7 +17,7 @@ produces: - application/json paths: - /current: + /: get: description: Retrieve current deployment system and bundles to install. parameters: @@ -28,7 +28,7 @@ - name: block in: query type: integer - description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified, 404 otherwise.' + description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified.' responses: '200': headers: @@ -37,50 +37,90 @@ type: string description: The deployment system and bundles to install. examples: - application/json: { - "deploymentId": "abc123", - "system" : { - "bundleId": "system-bundle-rev-3", - "uri": "file:///apid/bundles/system-bundle-rev-3.zip" - }, - "bundles": [ - { - "bundleId": "system-bundle-rev-3", - "uri": "file:///apid/bundles/system-bundle-release-1-1233.zip", - "scope": "@#$nike#$#$stage&#$(^#", - "org": "nike", - "env": "stage", - },{ - "bundleId": "bundleA-rev-9", - "uri": "file:///apid/bundles/bundleA-rev-9-26372.zip", - "scope": "@#$nike#$#$prod&#$(^#", - "org": "nike", - "env": "prod", - },{ - "bundleId": "bundleB-rev-1", - "uri": "file:///somewhere/bundles/bundleB-rev-1-72351.zip", - "scope": "@#$nike#$#$test&#$(^#", - "org": "nike", - "env": "test", - } - ] - } + application/json: [ + { + "id": "1234567890", + "displayName": "abc123", + "created":"1481917061", + "updated":"1481917061", + "createdBy":"mdobs", + "updatedBy":"mdobs", + "scopeId": "ABCDEF", + "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", + "configurationJson": { + "PropA": "scope1bundle1propA", + "PropSCOPE_LEVEL": "aaa", + "PropCLUSTER_LEVEL": "xxx" + } + }, + { + "id": "1234567891", + "displayName": "abc123_2", + "created":"1481917061", + "updated":"1481917061", + "createdBy":"mdobs", + "updatedBy":"mdobs", + "scopeId": "ABCDEF", + "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", + "configurationJson": { + "PropA": "scope1bundle2propA", + "PropSCOPE_LEVEL": "aaa", + "PropCLUSTER_LEVEL": "xxx" + } + }, + { + "id": "1234567892", + "displayName": "abc1234", + "created":"1481917061", + "updated":"1481917061", + "createdBy":"mdobs", + "updatedBy":"mdobs", + "scopeId": "ABCDEF", + "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", + "configurationJson": { + "PropA": "scope1bundlepropA", + "PropSCOPE_LEVEL": "aaa", + "PropCLUSTER_LEVEL": "xxx" + } + }, + { + "id": "1234567893", + "displayName": "abc123", + "created":"1481917061", + "updated":"1481917061", + "createdBy":"mdobs", + "updatedBy":"mdobs", + "scopeId": "EFGHIJK", + "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", + "configurationJson": { + "PropA": "scope2bundle1propA", + "PropSCOPE_LEVEL": "bbb", + "PropCLUSTER_LEVEL": "xxx" + } + }, + { + "id": "1234567894", + "displayName": "abc123_2", + "created":"1481917061", + "updated":"1481917061", + "createdBy":"fierrom", + "updatedBy":"fierrom", + "scopeId": "EFGHIJK", + "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip", + "configurationJson": { + "PropA": "scope2bundle2propA", + "PropSCOPE_LEVEL": "bbb", + "PropCLUSTER_LEVEL": "xxx" + } + } +] schema: $ref: '#/definitions/DeploymentResponse' '304': description: Deployment not modified. - '404': - description: No current Deployment. - - /{deploymentId}: post: description: Save results of deployment parameters: - - name: deploymentId - in: path - required: true - type: string - description: deployment ID - name: _ in: body required: true @@ -112,109 +152,80 @@ } DeploymentResponse: - type: object - required: - - deploymentId - - system - - bundles - properties: - deploymentId: - type: string - system: - type: object - $ref: '#/definitions/SystemBundle' - bundles: - type: array - items: - $ref: '#/definitions/UserBundle' + type: array + items: + $ref: '#/definitions/DeploymentBundle' - SystemBundle: + DeploymentBundle: type: object required: - - bundleId + - id + - scopeId + - createdBy + - created + - updatedBy + - updated + - displayName - uri + - configurationJson properties: - bundleId: + id: + type: string + scopeId: + type: string + createdBy: + type: string + created: + type: number + updatedBy: + type: string + updated: + type: number + displayName: type: string uri: type: string - - UserBundle: - allOf: - - $ref: '#/definitions/SystemBundle' - required: - - scope - properties: - scope: - type: string - description: Used to convey request scope information to APID APIs - org: - type: string - description: Available for legacy purposes - env: - type: string - description: Available for legacy purposes + configurationJson: + type: object DeploymentResult: + type: array + items: + $ref: '#/definitions/DeploymentBundleResult' + example: [ + { + "id": "1234567890", + "status": "SUCCESS" + }, + { + "id": "1234567890", + "status": "SUCCESS" + }, + { + "id": "1234567890", + "status": "SUCCESS" + } + ] + + DeploymentBundleResult: type: object required: + - id - status properties: + id: + type: string + message: + type: string + errorCode: + type: number status: type: string enum: - "SUCCESS" - "FAIL" - error: - $ref: '#/definitions/DeploymentResultError' description: Status of SUCCESS or FAIL plus error example: { + "id": 1234567890, "status": "SUCCESS" - } - - DeploymentResultError: - type: object - required: - - errorCode - - reason - properties: - errorCode: - type: number - reason: - type: string - bundleErrors: - type: array - items: - $ref: '#/definitions/DeploymentBundleError' - example: { - "error": { - "errorCode": 5, - "reason": "Failed restart", - "bundleErrors": [ - { - "bundleId": "system-bundle-rev-3", - "errorCode": 5, - "reason": "Invalid template parameter" - }, - { - "bundleId": "system-bundle-rev-9", - "errorCode": 1, - "reason": "Missing Virtual Host" - } - ] - } - } - - DeploymentBundleError: - type: object - required: - - bundleId - - errorCode - - reason - properties: - bundleId: - type: string - errorCode: - type: number - reason: - type: string + } \ No newline at end of file
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 7287e3e..ab0ca63 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -9,9 +9,11 @@ "io/ioutil" "net/http" "net/http/httptest" - "os" "testing" "time" + "net/url" + "encoding/hex" + "os" ) var ( @@ -39,16 +41,16 @@ router := apid.API().Router() // fake an unreliable bundle repo - downloadMultiplier = 10 * time.Millisecond + backOffMultiplier = 10 * time.Millisecond count := 0 - router.HandleFunc("/bundle/{id}", func(w http.ResponseWriter, req *http.Request) { + router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) { count++ if count % 2 == 0 { w.WriteHeader(500) return } vars := apid.API().Vars(req) - w.Write([]byte("/bundle/" + vars["id"])) + w.Write([]byte("/bundles/" + vars["id"])) }) testServer = httptest.NewServer(router) }) @@ -65,3 +67,14 @@ RegisterFailHandler(Fail) RunSpecs(t, "ApidGatewayDeploy Suite") } + +func testGetChecksum(hashType, uri string) string { + url, err := url.Parse(uri) + Expect(err).NotTo(HaveOccurred()) + + hashWriter, err := getHashWriter(hashType) + Expect(err).NotTo(HaveOccurred()) + + hashWriter.Write([]byte(url.Path)) + return hex.EncodeToString(hashWriter.Sum(nil)) +}
diff --git a/bundle.go b/bundle.go new file mode 100644 index 0000000..311534c --- /dev/null +++ b/bundle.go
@@ -0,0 +1,203 @@ +package apiGatewayDeploy + +import ( + "fmt" + "io" + "net/http" + "net/url" + "os" + "path" + "time" + "encoding/base64" + "io/ioutil" + "hash/crc32" + "errors" + "crypto/md5" + "hash" + "encoding/hex" +) + +const ( + DOWNLOAD_ATTEMPTS = 3 +) + +var ( + backOffMultiplier = 10 * time.Second +) + +func downloadBundle(dep dataDeployment) error { + + log.Debugf("starting bundle download process: %s", dep.BundleURI) + + hashWriter, err := getHashWriter(dep.BundleChecksumType) + if err != nil { + log.Errorf("invalid checksum type: %v", err) + return err + } + + // retry + var tempFile string + for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ { + tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum) + if err == nil { + break + } + if tempFile != "" { + os.Remove(tempFile) + } + + // simple back-off, we could potentially be more sophisticated + retryIn := time.Duration(i) * backOffMultiplier + log.Debugf("will retry failed download in %s: %v", retryIn, err) + time.Sleep(retryIn) + hashWriter.Reset() + } + + if err != nil { + log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS) + return err + } + + bundleFile := getBundleFile(dep) + err = os.Rename(tempFile, bundleFile) + if err != nil { + log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) + os.Remove(tempFile) + return err + } + + err = updateLocalURI(dep.ID, bundleFile) + if err != nil { + return err + } + + // send deployments to client + deploymentsChanged<- dep.ID + + return nil +} + +func getBundleFile(dep dataDeployment) string { + + // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI + // unfortunately, this also means that a bundle cache isn't especially relevant + fileName := dep.DataScopeID + dep.ID + dep.ID + + return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName))) +} + +func downloadFromURI(uri string, hashWriter hash.Hash, expectedHash string) (tempFileName string, err error) { + + log.Debugf("Downloading bundle: %s", uri) + + var tempFile *os.File + tempFile, err = ioutil.TempFile(bundlePath, "download") + if err != nil { + log.Errorf("Unable to create temp file: %v", err) + return + } + defer tempFile.Close() + tempFileName = tempFile.Name() + + var bundleReader io.ReadCloser + bundleReader, err = getURIFileReader(uri) + if err != nil { + log.Errorf("Unable to retrieve bundle %s: %v", uri, err) + return + } + defer bundleReader.Close() + + // track checksum + teedReader := io.TeeReader(bundleReader, hashWriter) + + _, err = io.Copy(tempFile, teedReader) + if err != nil { + log.Errorf("Unable to write bundle %s: %v", tempFileName, err) + return + } + + // check checksum + checksum := hex.EncodeToString(hashWriter.Sum(nil)) + if checksum != expectedHash { + err = errors.New(fmt.Sprintf("Bad checksum on %s. calculated: %s, given: %s", tempFileName, checksum, expectedHash)) + return + } + + log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName) + return +} + +// retrieveBundle retrieves bundle data from a URI +func getURIFileReader(uriString string) (io.ReadCloser, error) { + + uri, err := url.Parse(uriString) + if err != nil { + return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString) + } + + // todo: add authentication - TBD? + + // assume it's a file if no scheme + if uri.Scheme == "" || uri.Scheme == "file" { + f, err := os.Open(uri.Path) + if err != nil { + return nil, err + } + return f, nil + } + + // GET the contents at uriString + res, err := http.Get(uriString) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode) + } + return res.Body, nil +} + +func getHashWriter(hashType string) (hash.Hash, error) { + + var hashWriter hash.Hash + + switch hashType { + case "md5": + hashWriter = md5.New() + case "crc-32": + hashWriter = crc32.NewIEEE() + default: + return nil, errors.New("checksumType must be md5 or crc-32") + } + + return hashWriter, nil +} + +//func checksumFile(hashType, checksum string, fileName string) error { +// +// hashWriter, err := getHashWriter(hashType) +// if err != nil { +// return err +// } +// +// file, err := os.Open(fileName) +// if err != nil { +// return err +// } +// defer file.Close() +// +// if _, err := io.Copy(hashWriter, file); err != nil { +// return err +// } +// +// hashBytes := hashWriter.Sum(nil) +// //hashBytes := hashWriter.Sum(nil)[:hasher.Size()] +// //hashBytes := hashWriter.Sum(nil)[:] +// +// //hex.EncodeToString(hashBytes) +// if checksum != hex.EncodeToString(hashBytes) { +// return errors.New(fmt.Sprintf("bad checksum for %s", fileName)) +// } +// +// return nil +//}
diff --git a/data.go b/data.go index 0fd46b9..2c7fb8f 100644 --- a/data.go +++ b/data.go
@@ -2,20 +2,9 @@ import ( "database/sql" - "time" "github.com/30x/apid" "sync" -) - -const ( - DEPLOYMENT_STATE_INPROG = 1 - DEPLOYMENT_STATE_ERR_APID = 2 - DEPLOYMENT_STATE_ERR_GWY = 3 - DEPLOYMENT_STATE_READY = 4 - DEPLOYMENT_STATE_SUCCESS = 5 - - BUNDLE_TYPE_SYS = 1 - BUNDLE_TYPE_DEP = 2 + "fmt" ) var ( @@ -23,23 +12,55 @@ dbMux sync.RWMutex ) +type dataDeployment struct { + ID string + BundleConfigID string + ApidClusterID string + DataScopeID string + BundleConfigJSON string + ConfigJSON string + Status string + Created string + CreatedBy string + Updated string + UpdatedBy string + BundleName string + BundleURI string + BundleChecksum string + BundleChecksumType string + LocalBundleURI string +} + type SQLExec interface { Exec(query string, args ...interface{}) (sql.Result, error) } func initDB(db apid.DB) { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS gateway_deploy_deployment ( - id varchar(255), status integer, created_at integer, - modified_at integer, error_code varchar(255), - PRIMARY KEY (id)); - - CREATE TABLE IF NOT EXISTS gateway_deploy_bundle ( - deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer, - created_at integer, modified_at integer, status integer, error_code integer, error_reason text, - PRIMARY KEY (deployment_id, id), - FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE); + CREATE TABLE IF NOT EXISTS etag ( + value integer + ); + INSERT INTO etag VALUES (1); + CREATE TABLE IF NOT EXISTS deployments ( + id character varying(36) NOT NULL, + bundle_config_id varchar(36) NOT NULL, + apid_cluster_id varchar(36) NOT NULL, + data_scope_id varchar(36) NOT NULL, + bundle_config_json text NOT NULL, + config_json text NOT NULL, + status text NOT NULL, + created timestamp without time zone, + created_by text, + updated timestamp without time zone, + updated_by text, + bundle_name text, + bundle_uri text, + local_bundle_uri text, + deploy_status string, + deploy_error_code int, + deploy_error_message text, + PRIMARY KEY (id) + ); `) if err != nil { log.Panicf("Unable to initialize database: %v", err) @@ -48,10 +69,6 @@ log.Debug("Database tables created.") } -func dbTimeNow() int64 { - return int64(time.Now().UnixNano()) -} - func getDB() apid.DB { dbMux.RLock() db := unsafeDB @@ -68,222 +85,232 @@ dbMux.Unlock() } -func insertDeployment(db apid.DB, depID string, dep deployment) error { +// call whenever the list of deployments changes +func incrementETag() error { - log.Debugf("insertDeployment: %s", depID) + stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;") + if err != nil { + log.Errorf("prepare update etag failed: %v", err) + return err + } + defer stmt.Close() - tx, err := db.Begin() + _, err = stmt.Exec() + if err != nil { + log.Errorf("update etag failed: %v", err) + return err + } + + log.Debugf("etag incremented") + return err +} + +func getETag() (string, error) { + + var eTag string + err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag) + if err != nil { + log.Errorf("select etag failed: %v", err) + return "", err + } + + log.Debugf("etag queried: %v", eTag) + return eTag, err +} + +func insertDeployment(tx *sql.Tx, dep dataDeployment) error { + + log.Debugf("insertDeployment: %s", dep.ID) + + stmt, err := tx.Prepare(` + INSERT INTO deployments + (id, bundle_config_id, apid_cluster_id, data_scope_id, + bundle_config_json, config_json, status, created, + created_by, updated, updated_by, bundle_name, + bundle_uri, local_bundle_uri) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14); + `) + if err != nil { + log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err) + return err + } + defer stmt.Close() + + _, err = stmt.Exec( + dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, + dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created, + dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName, + dep.BundleURI, dep.LocalBundleURI) + if err != nil { + log.Errorf("insert into deployments %s failed: %v", dep.ID, err) + return err + } + + log.Debugf("insert into deployments %s succeeded", dep.ID) + return err +} + +func deleteDeployment(tx *sql.Tx, depID string) error { + + log.Debugf("deleteDeployment: %s", depID) + + stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;") + if err != nil { + log.Errorf("prepare delete from deployments %s failed: %v", depID, err) + return err + } + defer stmt.Close() + + _, err = stmt.Exec(depID) + if err != nil { + log.Errorf("delete from deployments %s failed: %v", depID, err) + return err + } + + deploymentsChanged<- depID + + log.Debugf("deleteDeployment %s succeeded", depID) + return err +} + +// getReadyDeployments() returns array of deployments that are ready to deploy +func getReadyDeployments() (deployments []dataDeployment, err error) { + + db := getDB() + rows, err := db.Query(` + SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, + bundle_config_json, config_json, status, created, + created_by, updated, updated_by, bundle_name, + bundle_uri, local_bundle_uri + FROM deployments + WHERE local_bundle_uri != "" AND deploy_error_code == 0 + `) + if err != nil { + if err == sql.ErrNoRows{ + return deployments, nil + } + log.Errorf("Error querying deployments: %v", err) + return + } + defer rows.Close() + + for rows.Next() { + dep := dataDeployment{} + rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID, + &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created, + &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName, + &dep.BundleURI, &dep.LocalBundleURI, + ) + deployments = append(deployments, dep) + } + + return +} + +// for testing +func getAllDeploymentRecords() (deployments []dataDeployment, err error) { + + db := getDB() + rows, err := db.Query(` + SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, + bundle_config_json, config_json, status, created, + created_by, updated, updated_by, bundle_name, + bundle_uri, local_bundle_uri + FROM deployments + `) + if err != nil { + if err == sql.ErrNoRows{ + return deployments, nil + } + log.Errorf("Error querying deployments: %v", err) + return + } + defer rows.Close() + + for rows.Next() { + dep := dataDeployment{} + rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID, + &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created, + &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName, + &dep.BundleURI, &dep.LocalBundleURI, + ) + deployments = append(deployments, dep) + } + + return +} + +// todo: is this a sufficient level of tracking? +func setDeploymentResults(results apiDeploymentResults) error { + + log.Debugf("setDeploymentResults: %v", results) + + tx, err := getDB().Begin() + if err != nil { + log.Errorf("Unable to begin transaction: %v", err) + return err + } defer tx.Rollback() + + stmt, err := tx.Prepare(` + UPDATE deployments + SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3 + WHERE id=$4; + `) if err != nil { - log.Errorf("insertDeployment begin transaction failed: %v", depID, err) + log.Errorf("prepare updateDeploymentStatus failed: %v", err) return err } + defer stmt.Close() - timeNow := dbTimeNow() - - _, err = tx.Exec("INSERT INTO gateway_deploy_deployment " + - "(id, status, created_at) VALUES(?,?,?);", - depID, DEPLOYMENT_STATE_READY, timeNow) - if err != nil { - log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err) - return err - } - - // system bundle - // todo: extra data? TBD - _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + - "(id, deployment_id, scope, type, uri, status, created_at) " + - "VALUES(?,?,?,?,?,?,?);", - dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_READY, timeNow) - if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err) - return err - } - - // todo: extra data? TBD - for _, bun := range dep.Bundles { - _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + - "(id, deployment_id, scope, type, uri, status, created_at) " + - "VALUES(?,?,?,?,?,?,?);", - bun.BundleID, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow) + for _, result := range results { + res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID) if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleID, err) + log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err) return err } + n, err := res.RowsAffected() + if n == 0 || err != nil { + log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID)) + } } err = tx.Commit() if err != nil { - log.Errorf("commit insert to gateway_deploy_bundle %s failed: %v", depID, err) - } - - log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID) - return err -} - -func updateDeploymentAndBundles(depID string, rsp deploymentResponse) error { - - log.Debugf("updateDeploymentAndBundles: %s", depID) - - db := getDB() - - /* - * If the state of deployment was success, update state of bundles and - * its deployments as success as well - */ - txn, err := db.Begin() - if err != nil { - log.Errorf("Unable to begin transaction: %s", err) - return err - } - defer txn.Rollback() - - if rsp.Status == RESPONSE_STATUS_SUCCESS { - err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0) - if err != nil { - return err - } - err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS) - if err != nil { - return err - } - } else { - err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.Error.ErrorCode) - if err != nil { - return err - } - - // Iterate over Bundles, and update the errors - for _, a := range rsp.Error.ErrorDetails { - updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason) - if err != nil { - return err - } - } - } - if err != nil { - return err - } - - err = txn.Commit() - if err != nil { - log.Errorf("Unable to commit updateDeploymentStatus transaction: %s", err) + log.Errorf("Unable to commit setDeploymentResults transaction: %v", err) } return err } -func updateDeploymentStatus(txn SQLExec, depID string, status int, errCode int) error { +func updateLocalURI(depID, localBundleUri string) error { - var nRows int64 - res, err := txn.Exec("UPDATE gateway_deploy_deployment " + - "SET status=?, modified_at=?, error_code = ? WHERE id=?;", status, dbTimeNow(), errCode, depID) - if err == nil { - nRows, err = res.RowsAffected() - if nRows == 0 { - err = sql.ErrNoRows - } - } + tx, err := getDB().Begin() if err != nil { - log.Errorf("UPDATE gateway_deploy_deployment %s failed: %v", depID, err) + log.Errorf("begin updateLocalURI failed: %v", err) + return err + } + defer tx.Rollback() + + stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;") + if err != nil { + log.Errorf("prepare updateLocalURI failed: %v", err) + return err + } + defer stmt.Close() + + _, err = stmt.Exec(localBundleUri, depID) + if err != nil { + log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err) return err } - log.Debugf("UPDATE gateway_deploy_deployment %s succeeded", depID) - - if status == DEPLOYMENT_STATE_READY { - incoming<- depID - } - - return nil -} - -func updateAllBundleStatus(txn SQLExec, depID string, status int) error { - var nRows int64 - res, err := txn.Exec("UPDATE gateway_deploy_bundle SET status = ? WHERE deployment_id = ?;", status, depID) - if err == nil { - nRows, err = res.RowsAffected() - if nRows == 0 { - err = sql.ErrNoRows - } - } + err = tx.Commit() if err != nil { - log.Errorf("UPDATE all gateway_deploy_bundle %s failed: %v", depID, err) + log.Errorf("commit updateLocalURI failed: %v", err) return err } + log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri) + return nil } - -func updateBundleStatus(txn SQLExec, depID string, bundleID string, status int, errCode int, errReason string) error { - var nRows int64 - res, err := txn.Exec("UPDATE gateway_deploy_bundle " + - "SET status=?, error_code=?, error_reason=?, modified_at=? WHERE id=?;", - status, errCode, errReason, dbTimeNow(), depID) - if err == nil { - nRows, err = res.RowsAffected() - if nRows == 0 { - err = sql.ErrNoRows - } - } - if err != nil { - log.Errorf("UPDATE gateway_deploy_bundle %s:%s failed: %v", depID, bundleID, err) - return err - } - - log.Debugf("UPDATE gateway_deploy_bundle success: %s:%s", depID, bundleID) - return nil -} - -// getCurrentDeploymentID returns the ID of what should be the "current" deployment -func getCurrentDeploymentID() (string, error) { - - db := getDB() - var depID string - err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " + - "WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID) - log.Debugf("current deployment id: %s", depID) - return depID, err -} - -// getDeployment returns a fully populated deploymentResponse -func getDeployment(depID string) (*deployment, error) { - - db := getDB() - rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope " + - "FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) - if err != nil { - log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err) - return nil, err - } - defer rows.Close() - - depRes := deployment{ - Bundles: []bundle{}, - DeploymentID: depID, - } - - for rows.Next() { - var bundleType int - var bundleID, uri, scope string - err = rows.Scan(&bundleID, &bundleType, &uri, &scope) - if err != nil { - log.Errorf("gateway_deploy_bundle fetch failed. Err: %s", err) - return nil, err - } - if bundleType == BUNDLE_TYPE_SYS { - depRes.System = bundle{ - BundleID: bundleID, - URI: uri, - } - } else { - fileUrl := getBundleFilePath(depID, uri) - bd := bundle{ - BundleID: bundleID, - URI: fileUrl, - Scope: scope, - } - depRes.Bundles = append(depRes.Bundles, bd) - } - } - return &depRes, nil -}
diff --git a/deployments.go b/deployments.go deleted file mode 100644 index 8fe22f4..0000000 --- a/deployments.go +++ /dev/null
@@ -1,261 +0,0 @@ -package apiGatewayDeploy - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "os" - "encoding/base64" - "path" - "errors" - "io/ioutil" - "time" - "github.com/30x/apid" -) - -// todo: remove downloaded bundle files from old deployments - -const ( - DOWNLOAD_ATTEMPTS = 3 -) - -var ( - downloadMultiplier = 10 * time.Second -) - -type systemBundle struct { - URI string `json:"uri"` -} - -type dependantBundle struct { - URI string `json:"uri"` - Scope string `json:"scope"` - Org string `json:"org"` - Env string `json:"env"` -} - -type bundleManifest struct { - SysBun systemBundle `json:"system"` - DepBun []dependantBundle `json:"bundles"` -} - -// event bundle -type bundle struct { - BundleID string `json:"bundleId"` - URI string `json:"uri"` - Scope string `json:"scope"` - Org string `json:"org"` - Env string `json:"env"` -} - -// event deployment -type deployment struct { - DeploymentID string `json:"deploymentId"` - System bundle `json:"system"` - Bundles []bundle `json:"bundles"` -} - -type deploymentErrorDetail struct { - ErrorCode int `json:"errorCode"` - Reason string `json:"reason"` - BundleID string `json:"bundleId"` -} - -type deploymentErrorResponse struct { - ErrorCode int `json:"errorCode"` - Reason string `json:"reason"` - ErrorDetails []deploymentErrorDetail `json:"bundleErrors"` -} - -type deploymentResponse struct { - Status string `json:"status"` - Error deploymentErrorResponse `json:"error"` -} - -// retrieveBundle retrieves bundle data from a URI -func getBundleReader(uriString string) (io.ReadCloser, error) { - - uri, err := url.Parse(uriString) - if err != nil { - return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString) - } - - // todo: Temporary blind bundle download, Bundle storage TBD - - // assume it's a file if no scheme - if uri.Scheme == "" || uri.Scheme == "file" { - f, err := os.Open(uri.Path) - if err != nil { - return nil, err - } - return f, nil - } - - // some random uri, try to GET it - res, err := http.Get(uriString) - if err != nil { - return nil, err - } - if res.StatusCode != 200 { - return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode) - } - return res.Body, nil -} - -// check if already exists and skip -func prepareBundle(depID string, bun bundle) error { - - bundleFile := getBundleFilePath(depID, bun.URI) - bundleDir := path.Dir(bundleFile) - - downloadBundle := func() (fileName string, err error) { - - log.Debugf("Downloading bundle: %s", bun.URI) - - var tempFile *os.File - tempFile, err = ioutil.TempFile(bundleDir, "download") - if err != nil { - log.Errorf("Unable to create temp file: %v", err) - return - } - defer tempFile.Close() - fileName = tempFile.Name() - - var bundleReader io.ReadCloser - bundleReader, err = getBundleReader(bun.URI) - if err != nil { - log.Errorf("Unable to retrieve bundle %s: %v", bun.URI, err) - return - } - defer bundleReader.Close() - - _, err = io.Copy(tempFile, bundleReader) - if err != nil { - log.Errorf("Unable to write bundle %s: %v", tempFile, err) - return - } - - log.Debugf("Bundle downloaded: %s", bun.URI) - return - } - - // retry - var tempFile string - var err error - for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ { - tempFile, err = downloadBundle() - if err == nil { - break - } - if tempFile != "" { - os.Remove(tempFile) - } - - // simple back-off, we could potentially be more sophisticated - retryIn := time.Duration(i) * downloadMultiplier - log.Debugf("will retry download in %s", retryIn) - time.Sleep(retryIn) - } - - if err != nil { - log.Errorf("failed %s download attempts. aborting.", DOWNLOAD_ATTEMPTS) - return err - } - - err = os.Rename(tempFile, bundleFile) - if err != nil { - log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err) - os.Remove(tempFile) - return err - } - - return nil -} - -func getDeploymentFilesPath(depID string) string { - return path.Join(bundlePath, depID) -} - -func getBundleFilePath(depID string, bundleURI string) string { - return path.Join(getDeploymentFilesPath(depID), base64.StdEncoding.EncodeToString([]byte(bundleURI))) -} - -// returns first bundle download error -// all bundles will be attempted regardless of errors, in the future we could retry -func prepareDeployment(db apid.DB, depID string, dep deployment) error { - - log.Debugf("preparing deployment: %s", depID) - - err := insertDeployment(db, depID, dep) - if err != nil { - log.Errorf("insert deployment failed: %v", err) - return err - } - - deploymentPath := getDeploymentFilesPath(depID) - err = os.MkdirAll(deploymentPath, 0700) - if err != nil { - log.Errorf("Deployment dir creation failed: %v", err) - return err - } - - // download bundles and store them locally - errorsChan := make(chan error, len(dep.Bundles)) - for i := range dep.Bundles { - bun := dep.Bundles[i] - go func() { - err := prepareBundle(depID, bun) - errorsChan<- err - if err != nil { - id := string(i) - err = updateBundleStatus(db, depID, id, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO, err.Error()) - if err != nil { - log.Errorf("Update bundle %s:%s status failed: %v", depID, id, err) - } - } - }() - } - - // fail fast on first error, otherwise wait for completion - for range dep.Bundles { - err := <-errorsChan - if err != nil { - updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO) - return err - } - } - - return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0) -} - -func parseManifest(manifestString string) (dep deployment, err error) { - err = json.Unmarshal([]byte(manifestString), &dep) - if err != nil { - log.Errorf("JSON decoding Manifest failed: %v", err) - return - } - - // validate manifest - if dep.System.URI == "" { - err = errors.New("system bundle 'uri' is required") - return - } - for _, bun := range dep.Bundles { - if bun.BundleID == "" { - err = errors.New("bundle 'bundleId' is required") - return - } - if bun.URI == "" { - err = errors.New("bundle 'uri' is required") - return - } - if bun.Scope == "" { - err = errors.New("bundle 'scope' is required") - return - } - } - - return -}
diff --git a/init.go b/init.go index c56642f..12f1339 100644 --- a/init.go +++ b/init.go
@@ -32,11 +32,11 @@ data = services.Data() relativeBundlePath := config.GetString(configBundleDirKey) - if err := os.MkdirAll(relativeBundlePath, 0700); err != nil { - log.Panicf("Failed bundle directory creation: %v", err) - } storagePath := config.GetString("local_storage_path") bundlePath = path.Join(storagePath, relativeBundlePath) + if err := os.MkdirAll(bundlePath, 0700); err != nil { + log.Panicf("Failed bundle directory creation: %v", err) + } log.Infof("Bundle directory path is %s", bundlePath) go distributeEvents()
diff --git a/listener.go b/listener.go index 39e35c2..353a0ee 100644 --- a/listener.go +++ b/listener.go
@@ -1,19 +1,28 @@ package apiGatewayDeploy import ( + "database/sql" + "encoding/json" "github.com/30x/apid" "github.com/apigee-labs/transicator/common" ) const ( APIGEE_SYNC_EVENT = "ApigeeSync" - MANIFEST_TABLE = "edgex.apid_config_manifest_deployment" + DEPLOYMENT_TABLE = "edgex.deployment" ) func initListener(services apid.Services) { services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{}) } +type bundleConfigJson struct { + Name string `json:"name"` + URI string `json:"uri"` + ChecksumType string `json:"checksumType"` + Checksum string `json:"checksum"` +} + type apigeeSyncHandler struct { } @@ -43,70 +52,200 @@ initDB(db) + tx, err := db.Begin() + defer tx.Rollback() for _, table := range snapshot.Tables { var err error switch table.Name { - case MANIFEST_TABLE: + case DEPLOYMENT_TABLE: log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows)) if len(table.Rows) == 0 { return } - // todo: should be 0 or 1 *per system*!! - TBD - row := table.Rows[len(table.Rows)-1] - err = processNewManifest(db, row) + for _, row := range table.Rows { + addDeployment(tx, row) + } } if err != nil { log.Panicf("Error processing Snapshot: %v", err) } } + err = tx.Commit() + if err != nil { + log.Panicf("Error committing Snapshot change: %v", err) + } + setDB(db) log.Debug("Snapshot processed") } func processChangeList(changes *common.ChangeList) { - db := getDB() + tx, err := getDB().Begin() + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } + defer tx.Rollback() for _, change := range changes.Changes { var err error switch change.Table { - case MANIFEST_TABLE: + case DEPLOYMENT_TABLE: switch change.Operation { case common.Insert: - err = processNewManifest(db, change.NewRow) + err = addDeployment(tx, change.NewRow) + case common.Delete: + var id string + err = change.OldRow.Get("id", &id) + if err == nil { + err = deleteDeployment(tx, id) + } default: - log.Error("unexpected operation: %s", change.Operation) + log.Errorf("unexpected operation: %s", change.Operation) } } if err != nil { log.Panicf("Error processing ChangeList: %v", err) } } + err = tx.Commit() + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } } -func processNewManifest(db apid.DB, row common.Row) error { +func addDeployment(tx *sql.Tx, row common.Row) (err error) { - var deploymentID, manifestString string - err := row.Get("id", &deploymentID) + d := dataDeployment{} + err = row.Get("id", &d.ID) if err != nil { - return err + return } - err = row.Get("manifest_body", &manifestString) + err = row.Get("bundle_config_id", &d.BundleConfigID) if err != nil { - return err + return + } + err = row.Get("apid_cluster_id", &d.ApidClusterID) + if err != nil { + return + } + err = row.Get("data_scope_id", &d.DataScopeID) + if err != nil { + return + } + err = row.Get("bundle_config_json", &d.BundleConfigJSON) + if err != nil { + return + } + err = row.Get("config_json", &d.ConfigJSON) + if err != nil { + return + } + err = row.Get("status", &d.Status) + if err != nil { + return + } + err = row.Get("created", &d.Created) + if err != nil { + return + } + err = row.Get("created_by", &d.CreatedBy) + if err != nil { + return + } + err = row.Get("updated", &d.Updated) + if err != nil { + return + } + err = row.Get("updated_by", &d.UpdatedBy) + if err != nil { + return } - manifest, err := parseManifest(manifestString) + var bc bundleConfigJson + err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc) if err != nil { - log.Errorf("error parsing manifest: %v", err) - return err + log.Errorf("JSON decoding Manifest failed: %v", err) + return } - err = prepareDeployment(db, deploymentID, manifest) + d.BundleName = bc.Name + d.BundleURI = bc.URI + d.BundleChecksumType = bc.ChecksumType + d.BundleChecksum = bc.Checksum + + err = insertDeployment(tx, d) if err != nil { - log.Errorf("serviceDeploymentQueue prepare deployment failed: %s", deploymentID) - return err + return } - return nil + // todo: limit # concurrent downloads? + go downloadBundle(d) + return } + +/* +Cleanup: + find bundles that are not used by any deployments, delete them + +On deployment delete: + delete deployment from DB + send deployments to client + +On deployment insert: + parse bundle_config_json + add display_name, bundle_uri to deployment + insert deployment into DB + if local bundle + send deployments to client + else + initiate bundle download + +On bundle downloaded: + send deployments to client + +Send deployments to client: + select * from deployments + for each where bundle exists, collect translation (see below) + send collection to client +*/ + +/* +KMS: +deployment ( + id character varying(36) NOT NULL, + bundle_config_id character varying(36) NOT NULL, + apid_cluster_id character varying(36) NOT NULL, + data_scope_id character varying(36) NOT NULL, + bundle_config_json text NOT NULL, + config_json text NOT NULL, + status text NOT NULL, + created timestamp without time zone, + created_by text, + updated timestamp without time zone, + updated_by text +); + +bundle_config_json: + id: + scopeId: + name: + uri: + crc: -> checksum, checksumType + created: + createdBy: + updated: + updatedBy: + +API Mapping: + Objects in deployments array: + id deployment.deploymentId + scopeId deployment.data_scope_id + created deployment.created + createdBy deployment.created_by + updated deployment.updated + updatedBy deployment.updated_by + configurationJson deployment.configurationJson + displayName bundle_config_json.name + uri downloaded file uri per bundle_config_json.uri +*/
diff --git a/listener_test.go b/listener_test.go index 7160f35..1bde3aa 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -7,182 +7,169 @@ . "github.com/onsi/gomega" "net/url" "github.com/apigee-labs/transicator/common" - "io/ioutil" ) var _ = Describe("listener", func() { - It("should process ApigeeSync snapshot event", func(done Done) { - - deploymentID := "listener_test_1" - - uri, err := url.Parse(testServer.URL) + BeforeEach(func() { + _, err := getDB().Exec("DELETE FROM deployments") Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundle/1" - bundleUri1 := uri.String() - uri.Path = "/bundle/2" - bundleUri2 := uri.String() + }) - dep := deployment{ - DeploymentID: deploymentID, - System: bundle{ - URI: "whatever", - }, - Bundles: []bundle{ - { - BundleID: "/bundle/1", - URI: bundleUri1, - Scope: "some-scope", + Context("ApigeeSync snapshot event", func() { + + It("should set DB and process", func(done Done) { + + deploymentID := "listener_test_1" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle1 := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri) + bundle1Json, err := json.Marshal(bundle1) + Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} + + var event = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{ + { + Name: DEPLOYMENT_TABLE, + Rows: []common.Row{row}, + }, }, - { - BundleID: "/bundle/2", - URI: bundleUri2, - Scope: "some-scope", + } + + var listener = make(chan string) + addSubscriber <- listener + + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) + + id := <-listener + Expect(id).To(Equal(deploymentID)) + + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d := deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.BundleName).To(Equal(bundle1.Name)) + Expect(d.BundleURI).To(Equal(bundle1.URI)) + + close(done) + }) + }) + + Context("ApigeeSync change event", func() { + + It("add event should add a deployment", func(done Done) { + + deploymentID := "add_test_1" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc-32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundle1Json, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} + + var event = common.ChangeList{ + Changes: []common.Change{ + { + Operation: common.Insert, + Table: DEPLOYMENT_TABLE, + NewRow: row, + }, }, - }, - } + } - depBytes, err := json.Marshal(dep) - Expect(err).ShouldNot(HaveOccurred()) + var listener = make(chan string) + addSubscriber <- listener - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)} + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - var event = common.Snapshot{ - SnapshotInfo: "test", - Tables: []common.Table{ - { - Name: MANIFEST_TABLE, - Rows: []common.Row{row}, + // wait for event to propagate + id := <-listener + Expect(id).To(Equal(deploymentID)) + + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d := deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.BundleName).To(Equal(bundle.Name)) + Expect(d.BundleURI).To(Equal(bundle.URI)) + + close(done) + }) + + It("delete event should delete a deployment", func(done Done) { + + deploymentID := "delete_test_1" + + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + dep := dataDeployment{ + ID: deploymentID, + LocalBundleURI: "whatever", + } + err = insertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + + var event = common.ChangeList{ + Changes: []common.Change{ + { + Operation: common.Delete, + Table: DEPLOYMENT_TABLE, + OldRow: row, + }, }, - }, - } + } - h := &test_handler{ - deploymentID, - func(e apid.Event) { - defer GinkgoRecover() + var listener = make(chan string) + addSubscriber <- listener - // ignore the first event, let standard listener process it - changeSet, ok := e.(*common.Snapshot) - if !ok || len(changeSet.Tables) > 0 { - return - } + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - testDeployment(dep) + id := <-listener + Expect(id).To(Equal(deploymentID)) - close(done) - }, - } + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) - apid.Events().Listen(APIGEE_SYNC_EVENT, h) - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // for standard listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{SnapshotInfo: "test"}) // for test listener - }) + Expect(len(deployments)).To(Equal(0)) - It("should process ApigeeSync change event", func(done Done) { - - deploymentID := "listener_test_2" - - var dep deployment - - h := &test_handler{ - deploymentID, - func(e apid.Event) { - defer GinkgoRecover() - - // ignore the first event, let standard listener process it - changeSet, ok := e.(*common.ChangeList) - if !ok || len(changeSet.Changes) > 0 { - return - } - - testDeployment(dep) - - close(done) - }, - } - - apid.Events().Listen(APIGEE_SYNC_EVENT, h) - - dep = triggerDeploymentEvent(deploymentID) - - apid.Events().Emit(APIGEE_SYNC_EVENT, &common.ChangeList{}) // for test listener + close(done) + }) }) }) - -type test_handler struct { - description string - f func(event apid.Event) -} - -func (t *test_handler) String() string { - return t.description -} - -func (t *test_handler) Handle(event apid.Event) { - t.f(event) -} - -func testDeployment(dep deployment) { - - depID, err := getCurrentDeploymentID() - Expect(err).ShouldNot(HaveOccurred()) - Expect(depID).Should(Equal(dep.DeploymentID)) - - deployment, err := getDeployment(depID) - Expect(deployment.Bundles).To(HaveLen(len(dep.Bundles))) - - for _, b := range dep.Bundles { - bundleFile := getBundleFilePath(depID, b.URI) - Expect(err).ShouldNot(HaveOccurred()) - Expect(bundleFile).To(BeARegularFile()) - - bytes, err := ioutil.ReadFile(bundleFile) - Expect(err).ShouldNot(HaveOccurred()) - Expect(string(bytes)).Should(Equal(b.BundleID)) - } -} - -func triggerDeploymentEvent(deploymentID string) deployment { - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - uri.Path = "/bundle/1" - bundleUri := uri.String() - - dep := deployment{ - DeploymentID: deploymentID, - System: bundle{ - URI: bundleUri, - }, - Bundles: []bundle{ - { - BundleID: "/bundle/1", - URI: bundleUri, - Scope: "some-scope", - }, - }, - } - - depBytes, err := json.Marshal(dep) - Expect(err).ShouldNot(HaveOccurred()) - - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)} - - var event = common.ChangeList{} - event.Changes = []common.Change{ - { - Operation: common.Insert, - Table: MANIFEST_TABLE, - NewRow: row, - }, - } - - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - - return dep -}