Merge pull request #3 from 30x/XAPID-423
rewritten for new deployment schema and API
diff --git a/.gitignore b/.gitignore
index 8d4776f..dd76eab 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,4 @@
coverage.txt
vendor
cmd/apidGatewayDeploy/apidGatewayDeploy
+*.lock
\ No newline at end of file
diff --git a/README.md b/README.md
index 398f2c2..c5bf99a 100644
--- a/README.md
+++ b/README.md
@@ -8,8 +8,10 @@
This plugin simply tracks counters based on called URIs:
-* `GET /deployments/current` - retrieve current deployment
-* `POST /deployments/{id}` - update specified deployment
+* `GET /deployments/` - retrieve current deployment
+* `POST /deployments/` - update deployments
+
+See [apidGatewayDeploy-api.yaml]() for full spec.
## Building and running standalone
@@ -28,15 +30,40 @@
Command line options:
-* -manifest <file path>
+* -deployments <file path>
-If you use the `-manifest` option, the server will start using a clean database that contains only the
- deployment manifest specified.
+If you use the `-deployments` option, the server will start using a clean database that contains only the
+ deployments contained in the file specified.
+
+The file should be the JSON for an array of deployments. JSON format is:
+
+ [
+ {
+ "id": "",
+ "scopeId": "",
+ "created": "",
+ "createdBy": "",
+ "updated": "",
+ "updatedBy": "",
+ "configuration": {
+ "key": "value"
+ },
+ "bundleConfiguration": {
+ "key": "value"
+ },
+ "displayName": "",
+ "uri": ""
+ }
+ ]
+
+Notes:
+* id must be unique
+* uri should point to a bundle file
Once the process is running, you should be able to manually give the plugin's API a whirl...
- curl -i localhost:9000/deployments/current
- curl -i -X POST localhost:9000/deployments/entityId -d '{ "status": "SUCCESS" }'
+ curl -i localhost:9000/deployments/
+ curl -i -X POST localhost:9000/deployments -d '{ ... }'
The following may be interesting env vars for configuration:
diff --git a/api.go b/api.go
index 09a4d17..8a54a31 100644
--- a/api.go
+++ b/api.go
@@ -1,26 +1,26 @@
package apiGatewayDeploy
import (
- "database/sql"
+ "bytes"
"encoding/json"
- "github.com/30x/apid"
+ "fmt"
"io/ioutil"
"net/http"
"strconv"
"time"
- "fmt"
)
const (
RESPONSE_STATUS_SUCCESS = "SUCCESS"
RESPONSE_STATUS_FAIL = "FAIL"
- ERROR_CODE_TODO = 0 // todo: add error codes where this is used
+ // todo: add error codes where this is used
+ ERROR_CODE_TODO = 0
)
var (
- incoming = make(chan string)
- addSubscriber = make(chan chan string)
+ deploymentsChanged = make(chan string)
+ addSubscriber = make(chan chan string)
)
type errorResponse struct {
@@ -28,9 +28,37 @@
Reason string `json:"reason"`
}
-func initAPI() {
- services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET")
- services.API().HandleFunc("/deployments/{deploymentID}", handleDeploymentResult).Methods("POST")
+type ApiDeployment struct {
+ ID string `json:"id"`
+ ScopeId string `json:"scopeId"`
+ Created string `json:"created"`
+ CreatedBy string `json:"createdBy"`
+ Updated string `json:"updated"`
+ UpdatedBy string `json:"updatedBy"`
+ ConfigJson json.RawMessage `json:"configuration"`
+ BundleConfigJson json.RawMessage `json:"bundleConfiguration"`
+ DisplayName string `json:"displayName"`
+ URI string `json:"uri"`
+}
+
+// sent to client
+type ApiDeploymentResponse []ApiDeployment
+
+type apiDeploymentResult struct {
+ ID string `json:"id"`
+ Status string `json:"status"`
+ ErrorCode int `json:"errorCode"`
+ Message string `json:"message"`
+}
+
+// received from client
+type apiDeploymentResults []apiDeploymentResult
+
+const deploymentsEndpoint = "/deployments"
+
+func InitAPI() {
+ services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
+ services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -56,10 +84,13 @@
subscribers := make(map[chan string]struct{})
for {
select {
- case msg := <-incoming:
- log.Debugf("Delivering new deployment %s to %d subscribers", msg, len(subscribers))
- for subscriber := range subscribers {
- delete(subscribers, subscriber) // all subscriptions are one-time notify
+ case msg := <-deploymentsChanged:
+ // todo: add a debounce w/ timeout to avoid sending on every single deployment?
+ subs := subscribers
+ incrementETag() // todo: do this elsewhere? check error?
+ subscribers = make(map[chan string]struct{})
+ log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
+ for subscriber := range subs {
select {
case subscriber <- msg:
log.Debugf("Handling deploy response for: %s", msg)
@@ -74,7 +105,7 @@
}
}
-func handleCurrentDeployment(w http.ResponseWriter, r *http.Request) {
+func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
// If returning If-None-Match value is equal to current deployment, status = 304
@@ -97,54 +128,54 @@
// If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
// query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
// most recent bundle list.
- priorDepID := r.Header.Get("If-None-Match")
- log.Debugf("if-none-match: %s", priorDepID)
+ ifNoneMatch := r.Header.Get("If-None-Match")
+ log.Debugf("if-none-match: %s", ifNoneMatch)
- // subscribe to new deployments in case we need it
- var newReq chan string
- if timeout > 0 && priorDepID != "" {
- newReq = make(chan string)
- addSubscriber <- newReq
+ // send unmodified if matches prior eTag and no timeout
+ eTag, err := getETag()
+ if err != nil {
+ writeDatabaseError(w)
+ return
+ }
+ if eTag == ifNoneMatch && timeout == 0 {
+ w.WriteHeader(http.StatusNotModified)
+ return
}
- depID, err := getCurrentDeploymentID()
- if err != nil && err != sql.ErrNoRows{
+ // subscribe to new deployments in case we need it
+ var gotNewDeployment chan string
+ if timeout > 0 && ifNoneMatch != "" {
+ gotNewDeployment = make(chan string)
+ addSubscriber <- gotNewDeployment
+ }
+
+ deployments, err := getReadyDeployments()
+ if err != nil {
writeDatabaseError(w)
return
}
- // not found, no timeout, send immediately
- if depID == "" && timeout == 0 {
+ // send not found if no timeout
+ if len(deployments) == 0 && timeout == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
- // found, send immediately - if doesn't match prior ID
- if depID != "" {
- if depID == priorDepID {
- if timeout == 0 {
- w.WriteHeader(http.StatusNotModified)
- return
- } else {
- // continue
- }
- } else {
- sendDeployment(w, depID)
- return
- }
+ // send results if different eTag
+ if eTag != ifNoneMatch {
+ sendDeployments(w, deployments, eTag)
+ return
}
- // can't send immediately, wait for the subscription
log.Debug("Blocking request... Waiting for new Deployments.")
- // block until new deployment or timeout
select {
- case depID := <-newReq:
- sendDeployment(w, depID)
+ case <-gotNewDeployment:
+ apiGetCurrentDeployments(w, r) // recurse
case <-time.After(time.Duration(timeout) * time.Second):
log.Debug("Blocking deployment request timed out.")
- if priorDepID != "" {
+ if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
w.WriteHeader(http.StatusNotFound)
@@ -152,62 +183,80 @@
}
}
-func sendDeployment(w http.ResponseWriter, depID string) {
- deployment, err := getDeployment(depID)
- if err != nil {
- log.Errorf("unable to retrieve deployment [%s]: %s", depID, err)
- w.WriteHeader(http.StatusInternalServerError)
+func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
+
+ var apiDeps ApiDeploymentResponse
+
+ for _, d := range dataDeps {
+ apiDeps = append(apiDeps, ApiDeployment{
+ ID: d.ID,
+ ScopeId: d.DataScopeID,
+ Created: d.Created,
+ CreatedBy: d.CreatedBy,
+ Updated: d.Updated,
+ UpdatedBy: d.UpdatedBy,
+ BundleConfigJson: []byte(d.BundleConfigJSON),
+ ConfigJson: []byte(d.ConfigJSON),
+ DisplayName: d.BundleName,
+ URI: d.LocalBundleURI,
+ })
}
- b, err := json.Marshal(deployment)
+
+ b, err := json.Marshal(apiDeps)
if err != nil {
- log.Errorf("unable to marshal deployment: %s", err)
+ log.Errorf("unable to marshal deployments: %s", err)
w.WriteHeader(http.StatusInternalServerError)
- } else {
- log.Debugf("sending deployment %s: %s", depID, b)
- w.Header().Set("ETag", depID)
- w.Write(b)
- }
-}
-
-// todo: we'll need to transmit results back to Edge somehow - TBD
-func handleDeploymentResult(w http.ResponseWriter, r *http.Request) {
-
- depID := apid.API().Vars(r)["deploymentID"]
-
- if depID == "" {
- log.Error("No deployment ID")
- writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Missing deployment ID")
return
}
- var rsp deploymentResponse
+ log.Debugf("sending deployment %s: %s", eTag, b)
+ w.Header().Set("ETag", eTag)
+ w.Write(b)
+}
+
+func apiSetDeploymentResults(w http.ResponseWriter, r *http.Request) {
+
+ var results apiDeploymentResults
buf, _ := ioutil.ReadAll(r.Body)
- err := json.Unmarshal(buf, &rsp)
+ err := json.Unmarshal(buf, &results)
if err != nil {
- log.Error("Resp Handler Json Unmarshal err: ", err)
+ log.Errorf("Resp Handler Json Unmarshal err: ", err)
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Malformed JSON")
return
}
- if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL {
- writeError(w, http.StatusBadRequest, ERROR_CODE_TODO,
- fmt.Sprintf("status must be '%s' or '%s'", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL))
- return
- }
+ // validate the results
+ // todo: these errors to the client should be standardized
+ var errs bytes.Buffer
+ for i, rsp := range results {
+ if rsp.ID == "" {
+ errs.WriteString(fmt.Sprintf("Missing id at %d\n", i))
+ }
- if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.Error.ErrorCode == 0 || rsp.Error.Reason == "") {
- writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "errorCode and reason are required")
- return
- }
+ if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL {
+ errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
+ }
- err = updateDeploymentAndBundles(depID, rsp)
- if err != nil {
- if err == sql.ErrNoRows {
- writeError(w, http.StatusNotFound, ERROR_CODE_TODO, "not found")
- } else {
- writeDatabaseError(w)
+ if rsp.Status == RESPONSE_STATUS_FAIL {
+ if rsp.ErrorCode == 0 {
+ errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i))
+ }
+ if rsp.Message == "" {
+ errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i))
+ }
}
}
+ if errs.Len() > 0 {
+ writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, errs.String())
+ }
+
+ err = setDeploymentResults(results)
+ if err != nil {
+ writeDatabaseError(w)
+ }
+
+ // todo: transmit to server (API TBD)
+ //err = transmitDeploymentResultsToServer()
return
}
diff --git a/api_test.go b/api_test.go
index 669f3e0..cb8ec38 100644
--- a/api_test.go
+++ b/api_test.go
@@ -1,30 +1,30 @@
package apiGatewayDeploy
import (
- "bytes"
- "encoding/json"
- "fmt"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
+ "encoding/json"
+ "io/ioutil"
"time"
+ "bytes"
)
var _ = Describe("api", func() {
- Context("GET /deployments/current", func() {
+ BeforeEach(func() {
+ _, err := getDB().Exec("DELETE FROM deployments")
+ Expect(err).ShouldNot(HaveOccurred())
+ })
- It("should get 404 if no deployments", func() {
+ Context("GET /deployments", func() {
- db := getDB()
- _, err := db.Exec("DELETE FROM gateway_deploy_deployment")
- Expect(err).ShouldNot(HaveOccurred())
+ It("should get an empty array if no deployments", func() {
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
@@ -32,27 +32,40 @@
Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
})
- It("should get current deployment", func() {
+ It("should get current deployments", func() {
deploymentID := "api_get_current"
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
- var depRes deployment
+ var depRes ApiDeploymentResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).ShouldNot(HaveOccurred())
json.Unmarshal(body, &depRes)
- Expect(depRes.DeploymentID).Should(Equal(deploymentID))
- Expect(depRes.Bundles[0].Scope).Should(Equal("some-scope"))
+ Expect(len(depRes)).To(Equal(1))
- Expect(res.Header.Get("etag")).Should(Equal(deploymentID))
+ dep := depRes[0]
+
+ Expect(dep.ID).To(Equal(deploymentID))
+ Expect(dep.ScopeId).To(Equal(deploymentID))
+ Expect(dep.DisplayName).To(Equal(deploymentID))
+
+ var config bundleConfigJson
+
+ err = json.Unmarshal(dep.ConfigJson, &config)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(config.Name).To(Equal("/bundles/1"))
+
+ err = json.Unmarshal(dep.BundleConfigJson, &config)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(config.Name).To(Equal("/bundles/1"))
})
It("should get 304 for no change", func() {
@@ -61,7 +74,7 @@
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
@@ -76,14 +89,10 @@
Expect(res.StatusCode).To(Equal(http.StatusNotModified))
})
- It("should get 404 after blocking if no deployment", func() {
-
- db := getDB()
- _, err := db.Exec("DELETE FROM gateway_deploy_deployment")
- Expect(err).ShouldNot(HaveOccurred())
+ It("should get empty set after blocking if no deployments", func() {
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
query := uri.Query()
query.Add("block", "1")
@@ -91,7 +100,9 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
- Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
+
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+
})
It("should get new deployment after blocking", func(done Done) {
@@ -99,7 +110,7 @@
deploymentID := "api_get_current_blocking"
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
@@ -120,27 +131,25 @@
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusOK))
- var depRes deployment
+ var depRes ApiDeploymentResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).ShouldNot(HaveOccurred())
json.Unmarshal(body, &depRes)
- Expect(depRes.DeploymentID).Should(Equal(deploymentID))
+ Expect(len(depRes)).To(Equal(2))
- for _, bundle := range depRes.Bundles {
- uri, err := url.Parse(bundle.URI)
- Expect(err).ShouldNot(HaveOccurred())
- bContent, err := ioutil.ReadFile(uri.Path)
- Expect(err).ShouldNot(HaveOccurred())
- content := string(bContent)
- Expect(content).Should(HavePrefix("/bundle/"))
- }
+ dep := depRes[1]
+
+ Expect(dep.ID).To(Equal(deploymentID))
+ Expect(dep.ScopeId).To(Equal(deploymentID))
+ Expect(dep.DisplayName).To(Equal(deploymentID))
close(done)
}()
- time.Sleep(25 * time.Millisecond) // give api call above time to block
- triggerDeploymentEvent(deploymentID)
+ time.Sleep(250 * time.Millisecond) // give api call above time to block
+ insertTestDeployment(testServer, deploymentID)
+ deploymentsChanged<- deploymentID
})
It("should get 304 after blocking if no new deployment", func() {
@@ -148,7 +157,7 @@
deploymentID := "api_no_change_blocking"
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = "/deployments/current"
+ uri.Path = deploymentsEndpoint
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
@@ -167,19 +176,17 @@
})
})
- Context("POST /deployments/{ID}", func() {
+ Context("POST /deployments", func() {
- It("should return a 404 for missing deployment", func() {
-
- deploymentID := "api_missing_deployment"
+ It("should return BadRequest for invalid request", func() {
uri, err := url.Parse(testServer.URL)
- uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+ uri.Path = deploymentsEndpoint
- deploymentResult := deploymentResponse{
- Status: RESPONSE_STATUS_SUCCESS,
+ deploymentResult := apiDeploymentResults{
+ apiDeploymentResult{
+ },
}
-
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
@@ -189,22 +196,49 @@
resp, err := http.DefaultClient.Do(req)
defer resp.Body.Close()
Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusNotFound))
+ Expect(resp.StatusCode).Should(Equal(http.StatusBadRequest))
})
- It("should mark a deployment as deployed", func() {
+ It("should ignore deployments that can't be found", func() {
+
+ deploymentID := "api_missing_deployment"
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = deploymentsEndpoint
+
+ deploymentResult := apiDeploymentResults{
+ apiDeploymentResult{
+ ID: deploymentID,
+ Status: RESPONSE_STATUS_SUCCESS,
+ },
+ }
+ payload, err := json.Marshal(deploymentResult)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req.Header.Add("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ defer resp.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp.StatusCode).Should(Equal(http.StatusOK))
+ })
+
+ It("should mark a deployment as successful", func() {
db := getDB()
deploymentID := "api_mark_deployed"
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+ uri.Path = deploymentsEndpoint
- deploymentResult := deploymentResponse{
- Status: RESPONSE_STATUS_SUCCESS,
+ deploymentResult := apiDeploymentResults{
+ apiDeploymentResult{
+ ID: deploymentID,
+ Status: RESPONSE_STATUS_SUCCESS,
+ },
}
-
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
@@ -216,44 +250,29 @@
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
- var deployStatus int
- err = db.QueryRow("SELECT status FROM gateway_deploy_deployment WHERE id=?", deploymentID).
+ var deployStatus string
+ err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID).
Scan(&deployStatus)
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
-
- rows, err := db.Query("SELECT status from gateway_deploy_bundle WHERE id = ?;", deploymentID)
- Expect(err).ShouldNot(HaveOccurred())
- defer rows.Close()
- for rows.Next() {
- rows.Scan(&deployStatus)
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
- }
+ Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS))
})
It("should mark a deployment as failed", func() {
db := getDB()
- deploymentID := "api_test_3"
+ deploymentID := "api_mark_failed"
insertTestDeployment(testServer, deploymentID)
uri, err := url.Parse(testServer.URL)
- uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+ uri.Path = deploymentsEndpoint
- deploymentResult := deploymentResponse{
- Status: RESPONSE_STATUS_FAIL,
- Error: deploymentErrorResponse{
+ deploymentResult := apiDeploymentResults{
+ apiDeploymentResult{
+ ID: deploymentID,
+ Status: RESPONSE_STATUS_FAIL,
ErrorCode: 100,
- Reason: "bad juju",
- //ErrorDetails: []deploymentErrorDetail{ // todo: add tests for bundle errors
- // {
- // BundleId: "",
- // ErrorCode: 100,
- // Reason: "Zombies",
- // },
- //},
+ Message: "Some error message",
},
}
-
payload, err := json.Marshal(deploymentResult)
Expect(err).ShouldNot(HaveOccurred())
@@ -265,41 +284,60 @@
Expect(err).ShouldNot(HaveOccurred())
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
- var deployStatus int
- err = db.QueryRow("SELECT status from gateway_deploy_deployment WHERE id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_ERR_GWY))
+ var deployStatus, deploy_error_message string
+ var deploy_error_code int
+ err = db.QueryRow(`
+ SELECT deploy_status, deploy_error_code, deploy_error_message
+ FROM deployments
+ WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message)
+ Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL))
+ Expect(deploy_error_code).Should(Equal(100))
+ Expect(deploy_error_message).Should(Equal("Some error message"))
})
})
})
-func insertTestDeployment(server *httptest.Server, depID string) {
+func insertTestDeployment(testServer *httptest.Server, deploymentID string) {
- db := getDB()
- uri, err := url.Parse(server.URL)
+ uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundle"
- bundleUri := uri.String()
- dep := deployment{
- System: bundle{
- URI: bundleUri,
- },
- Bundles: []bundle{
- {
- BundleID: "bun",
- URI: bundleUri,
- Scope: "some-scope",
- Org: "org",
- Env: "env",
- },
- },
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundleJson, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ BundleConfigID: deploymentID,
+ ApidClusterID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleConfigJSON: string(bundleJson),
+ ConfigJSON: string(bundleJson),
+ Status: "",
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: deploymentID,
+ BundleURI: "",
+ BundleChecksum: "",
+ BundleChecksumType: "",
+ LocalBundleURI: "x",
}
- err = insertDeployment(db, depID, dep)
+ err = InsertDeployment(tx, dep)
Expect(err).ShouldNot(HaveOccurred())
- err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
+ err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
}
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml
index c48ca40..916c081 100644
--- a/apidGatewayDeploy-api.yaml
+++ b/apidGatewayDeploy-api.yaml
@@ -17,7 +17,7 @@
produces:
- application/json
paths:
- /current:
+ /:
get:
description: Retrieve current deployment system and bundles to install.
parameters:
@@ -28,7 +28,7 @@
- name: block
in: query
type: integer
- description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified, 404 otherwise.'
+ description: 'If block > 0 AND if there is no new bundle list available, then block for up to the specified number of seconds until a new bundle list becomes available. If no new deployment becomes available, then return 304 Not Modified if If-None-Match is specified.'
responses:
'200':
headers:
@@ -37,50 +37,90 @@
type: string
description: The deployment system and bundles to install.
examples:
- application/json: {
- "deploymentId": "abc123",
- "system" : {
- "bundleId": "system-bundle-rev-3",
- "uri": "file:///apid/bundles/system-bundle-rev-3.zip"
- },
- "bundles": [
- {
- "bundleId": "system-bundle-rev-3",
- "uri": "file:///apid/bundles/system-bundle-release-1-1233.zip",
- "scope": "@#$nike#$#$stage&#$(^#",
- "org": "nike",
- "env": "stage",
- },{
- "bundleId": "bundleA-rev-9",
- "uri": "file:///apid/bundles/bundleA-rev-9-26372.zip",
- "scope": "@#$nike#$#$prod&#$(^#",
- "org": "nike",
- "env": "prod",
- },{
- "bundleId": "bundleB-rev-1",
- "uri": "file:///somewhere/bundles/bundleB-rev-1-72351.zip",
- "scope": "@#$nike#$#$test&#$(^#",
- "org": "nike",
- "env": "test",
- }
- ]
- }
+ application/json: [
+ {
+ "id": "1234567890",
+ "displayName": "abc123",
+ "created":"1481917061",
+ "updated":"1481917061",
+ "createdBy":"mdobs",
+ "updatedBy":"mdobs",
+ "scopeId": "ABCDEF",
+ "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
+ "configurationJson": {
+ "PropA": "scope1bundle1propA",
+ "PropSCOPE_LEVEL": "aaa",
+ "PropCLUSTER_LEVEL": "xxx"
+ }
+ },
+ {
+ "id": "1234567891",
+ "displayName": "abc123_2",
+ "created":"1481917061",
+ "updated":"1481917061",
+ "createdBy":"mdobs",
+ "updatedBy":"mdobs",
+ "scopeId": "ABCDEF",
+ "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
+ "configurationJson": {
+ "PropA": "scope1bundle2propA",
+ "PropSCOPE_LEVEL": "aaa",
+ "PropCLUSTER_LEVEL": "xxx"
+ }
+ },
+ {
+ "id": "1234567892",
+ "displayName": "abc1234",
+ "created":"1481917061",
+ "updated":"1481917061",
+ "createdBy":"mdobs",
+ "updatedBy":"mdobs",
+ "scopeId": "ABCDEF",
+ "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
+ "configurationJson": {
+ "PropA": "scope1bundlepropA",
+ "PropSCOPE_LEVEL": "aaa",
+ "PropCLUSTER_LEVEL": "xxx"
+ }
+ },
+ {
+ "id": "1234567893",
+ "displayName": "abc123",
+ "created":"1481917061",
+ "updated":"1481917061",
+ "createdBy":"mdobs",
+ "updatedBy":"mdobs",
+ "scopeId": "EFGHIJK",
+ "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
+ "configurationJson": {
+ "PropA": "scope2bundle1propA",
+ "PropSCOPE_LEVEL": "bbb",
+ "PropCLUSTER_LEVEL": "xxx"
+ }
+ },
+ {
+ "id": "1234567894",
+ "displayName": "abc123_2",
+ "created":"1481917061",
+ "updated":"1481917061",
+ "createdBy":"fierrom",
+ "updatedBy":"fierrom",
+ "scopeId": "EFGHIJK",
+ "uri": "file:///tmp/F1ERRO/0c9853d1ad9b7ec9f7d16ed16ada1be4/archive/369a01f320f926cd8ffac5dfda83b1d8a2129ab3.zip",
+ "configurationJson": {
+ "PropA": "scope2bundle2propA",
+ "PropSCOPE_LEVEL": "bbb",
+ "PropCLUSTER_LEVEL": "xxx"
+ }
+ }
+]
schema:
$ref: '#/definitions/DeploymentResponse'
'304':
description: Deployment not modified.
- '404':
- description: No current Deployment.
-
- /{deploymentId}:
post:
description: Save results of deployment
parameters:
- - name: deploymentId
- in: path
- required: true
- type: string
- description: deployment ID
- name: _
in: body
required: true
@@ -112,109 +152,80 @@
}
DeploymentResponse:
- type: object
- required:
- - deploymentId
- - system
- - bundles
- properties:
- deploymentId:
- type: string
- system:
- type: object
- $ref: '#/definitions/SystemBundle'
- bundles:
- type: array
- items:
- $ref: '#/definitions/UserBundle'
+ type: array
+ items:
+ $ref: '#/definitions/DeploymentBundle'
- SystemBundle:
+ DeploymentBundle:
type: object
required:
- - bundleId
+ - id
+ - scopeId
+ - createdBy
+ - created
+ - updatedBy
+ - updated
+ - displayName
- uri
+ - configurationJson
properties:
- bundleId:
+ id:
+ type: string
+ scopeId:
+ type: string
+ createdBy:
+ type: string
+ created:
+ type: number
+ updatedBy:
+ type: string
+ updated:
+ type: number
+ displayName:
type: string
uri:
type: string
-
- UserBundle:
- allOf:
- - $ref: '#/definitions/SystemBundle'
- required:
- - scope
- properties:
- scope:
- type: string
- description: Used to convey request scope information to APID APIs
- org:
- type: string
- description: Available for legacy purposes
- env:
- type: string
- description: Available for legacy purposes
+ configurationJson:
+ type: object
DeploymentResult:
+ type: array
+ items:
+ $ref: '#/definitions/DeploymentBundleResult'
+ example: [
+ {
+ "id": "1234567890",
+ "status": "SUCCESS"
+ },
+ {
+ "id": "1234567890",
+ "status": "SUCCESS"
+ },
+ {
+ "id": "1234567890",
+ "status": "SUCCESS"
+ }
+ ]
+
+ DeploymentBundleResult:
type: object
required:
+ - id
- status
properties:
+ id:
+ type: string
+ message:
+ type: string
+ errorCode:
+ type: number
status:
type: string
enum:
- "SUCCESS"
- "FAIL"
- error:
- $ref: '#/definitions/DeploymentResultError'
description: Status of SUCCESS or FAIL plus error
example: {
+ "id": 1234567890,
"status": "SUCCESS"
- }
-
- DeploymentResultError:
- type: object
- required:
- - errorCode
- - reason
- properties:
- errorCode:
- type: number
- reason:
- type: string
- bundleErrors:
- type: array
- items:
- $ref: '#/definitions/DeploymentBundleError'
- example: {
- "error": {
- "errorCode": 5,
- "reason": "Failed restart",
- "bundleErrors": [
- {
- "bundleId": "system-bundle-rev-3",
- "errorCode": 5,
- "reason": "Invalid template parameter"
- },
- {
- "bundleId": "system-bundle-rev-9",
- "errorCode": 1,
- "reason": "Missing Virtual Host"
- }
- ]
- }
- }
-
- DeploymentBundleError:
- type: object
- required:
- - bundleId
- - errorCode
- - reason
- properties:
- bundleId:
- type: string
- errorCode:
- type: number
- reason:
- type: string
+ }
\ No newline at end of file
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7287e3e..3ce174c 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -9,9 +9,11 @@
"io/ioutil"
"net/http"
"net/http/httptest"
- "os"
"testing"
"time"
+ "net/url"
+ "encoding/hex"
+ "os"
)
var (
@@ -34,21 +36,22 @@
db, err := data.DB()
Expect(err).NotTo(HaveOccurred())
- initDB(db)
- setDB(db)
+ err = InitDB(db)
+ Expect(err).NotTo(HaveOccurred())
+ SetDB(db)
router := apid.API().Router()
// fake an unreliable bundle repo
- downloadMultiplier = 10 * time.Millisecond
+ backOffMultiplier = 10 * time.Millisecond
count := 0
- router.HandleFunc("/bundle/{id}", func(w http.ResponseWriter, req *http.Request) {
+ router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
count++
if count % 2 == 0 {
w.WriteHeader(500)
return
}
vars := apid.API().Vars(req)
- w.Write([]byte("/bundle/" + vars["id"]))
+ w.Write([]byte("/bundles/" + vars["id"]))
})
testServer = httptest.NewServer(router)
})
@@ -65,3 +68,14 @@
RegisterFailHandler(Fail)
RunSpecs(t, "ApidGatewayDeploy Suite")
}
+
+func testGetChecksum(hashType, uri string) string {
+ url, err := url.Parse(uri)
+ Expect(err).NotTo(HaveOccurred())
+
+ hashWriter, err := getHashWriter(hashType)
+ Expect(err).NotTo(HaveOccurred())
+
+ hashWriter.Write([]byte(url.Path))
+ return hex.EncodeToString(hashWriter.Sum(nil))
+}
diff --git a/bundle.go b/bundle.go
new file mode 100644
index 0000000..4762165
--- /dev/null
+++ b/bundle.go
@@ -0,0 +1,203 @@
+package apiGatewayDeploy
+
+import (
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "os"
+ "path"
+ "time"
+ "encoding/base64"
+ "io/ioutil"
+ "hash/crc32"
+ "errors"
+ "crypto/md5"
+ "hash"
+ "encoding/hex"
+)
+
+const (
+ DOWNLOAD_ATTEMPTS = 3
+)
+
+var (
+ backOffMultiplier = 10 * time.Second
+)
+
+func downloadBundle(dep DataDeployment) error {
+
+ log.Debugf("starting bundle download process: %s", dep.BundleURI)
+
+ hashWriter, err := getHashWriter(dep.BundleChecksumType)
+ if err != nil {
+ log.Errorf("invalid checksum type: %v", err)
+ return err
+ }
+
+ // retry
+ var tempFile string
+ for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+ tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+ if err == nil {
+ break
+ }
+ if tempFile != "" {
+ os.Remove(tempFile)
+ }
+
+ // simple back-off, we could potentially be more sophisticated
+ retryIn := time.Duration(i) * backOffMultiplier
+ log.Debugf("will retry failed download in %s: %v", retryIn, err)
+ time.Sleep(retryIn)
+ hashWriter.Reset()
+ }
+
+ if err != nil {
+ log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
+ return err
+ }
+
+ bundleFile := getBundleFile(dep)
+ err = os.Rename(tempFile, bundleFile)
+ if err != nil {
+ log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
+ os.Remove(tempFile)
+ return err
+ }
+
+ err = updateLocalURI(dep.ID, bundleFile)
+ if err != nil {
+ return err
+ }
+
+ // send deployments to client
+ deploymentsChanged<- dep.ID
+
+ return nil
+}
+
+func getBundleFile(dep DataDeployment) string {
+
+ // the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
+ // unfortunately, this also means that a bundle cache isn't especially relevant
+ fileName := dep.DataScopeID + dep.ID + dep.ID
+
+ return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName)))
+}
+
+func downloadFromURI(uri string, hashWriter hash.Hash, expectedHash string) (tempFileName string, err error) {
+
+ log.Debugf("Downloading bundle: %s", uri)
+
+ var tempFile *os.File
+ tempFile, err = ioutil.TempFile(bundlePath, "download")
+ if err != nil {
+ log.Errorf("Unable to create temp file: %v", err)
+ return
+ }
+ defer tempFile.Close()
+ tempFileName = tempFile.Name()
+
+ var bundleReader io.ReadCloser
+ bundleReader, err = getURIFileReader(uri)
+ if err != nil {
+ log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
+ return
+ }
+ defer bundleReader.Close()
+
+ // track checksum
+ teedReader := io.TeeReader(bundleReader, hashWriter)
+
+ _, err = io.Copy(tempFile, teedReader)
+ if err != nil {
+ log.Errorf("Unable to write bundle %s: %v", tempFileName, err)
+ return
+ }
+
+ // check checksum
+ checksum := hex.EncodeToString(hashWriter.Sum(nil))
+ if checksum != expectedHash {
+ err = errors.New(fmt.Sprintf("Bad checksum on %s. calculated: %s, given: %s", tempFileName, checksum, expectedHash))
+ return
+ }
+
+ log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName)
+ return
+}
+
+// retrieveBundle retrieves bundle data from a URI
+func getURIFileReader(uriString string) (io.ReadCloser, error) {
+
+ uri, err := url.Parse(uriString)
+ if err != nil {
+ return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString)
+ }
+
+ // todo: add authentication - TBD?
+
+ // assume it's a file if no scheme
+ if uri.Scheme == "" || uri.Scheme == "file" {
+ f, err := os.Open(uri.Path)
+ if err != nil {
+ return nil, err
+ }
+ return f, nil
+ }
+
+ // GET the contents at uriString
+ res, err := http.Get(uriString)
+ if err != nil {
+ return nil, err
+ }
+ if res.StatusCode != 200 {
+ return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode)
+ }
+ return res.Body, nil
+}
+
+func getHashWriter(hashType string) (hash.Hash, error) {
+
+ var hashWriter hash.Hash
+
+ switch hashType {
+ case "md5":
+ hashWriter = md5.New()
+ case "crc-32":
+ hashWriter = crc32.NewIEEE()
+ default:
+ return nil, errors.New("checksumType must be md5 or crc-32")
+ }
+
+ return hashWriter, nil
+}
+
+//func checksumFile(hashType, checksum string, fileName string) error {
+//
+// hashWriter, err := getHashWriter(hashType)
+// if err != nil {
+// return err
+// }
+//
+// file, err := os.Open(fileName)
+// if err != nil {
+// return err
+// }
+// defer file.Close()
+//
+// if _, err := io.Copy(hashWriter, file); err != nil {
+// return err
+// }
+//
+// hashBytes := hashWriter.Sum(nil)
+// //hashBytes := hashWriter.Sum(nil)[:hasher.Size()]
+// //hashBytes := hashWriter.Sum(nil)[:]
+//
+// //hex.EncodeToString(hashBytes)
+// if checksum != hex.EncodeToString(hashBytes) {
+// return errors.New(fmt.Sprintf("bad checksum for %s", fileName))
+// }
+//
+// return nil
+//}
diff --git a/cmd/apidGatewayDeploy/deployments.json b/cmd/apidGatewayDeploy/deployments.json
new file mode 100644
index 0000000..72efd9f
--- /dev/null
+++ b/cmd/apidGatewayDeploy/deployments.json
@@ -0,0 +1,24 @@
+[
+ {
+ "id": "api_no_change_blocking",
+ "scopeId": "api_no_change_blocking",
+ "created": "",
+ "createdBy": "",
+ "updated": "",
+ "updatedBy": "",
+ "configuration": {
+ "name": "/bundles/1",
+ "uri": "http://127.0.0.1:53589/bundles/1",
+ "checksumType": "crc-32",
+ "checksum": "c65a0f2a"
+ },
+ "bundleConfiguration": {
+ "name": "/bundles/1",
+ "uri": "http://127.0.0.1:53589/bundles/1",
+ "checksumType": "crc-32",
+ "checksum": "c65a0f2a"
+ },
+ "displayName": "api_no_change_blocking",
+ "uri": "x"
+ }
+]
\ No newline at end of file
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
index 5b162d4..5381c51 100644
--- a/cmd/apidGatewayDeploy/main.go
+++ b/cmd/apidGatewayDeploy/main.go
@@ -6,54 +6,58 @@
"github.com/30x/apid/factory"
_ "github.com/30x/apidGatewayDeploy"
"io/ioutil"
- "github.com/apigee-labs/transicator/common"
"github.com/30x/apidGatewayDeploy"
"os"
+ "encoding/json"
)
func main() {
- manifestFlag := flag.String("manifest", "", "file path to a manifest yaml file")
+ deploymentsFlag := flag.String("deployments", "", "file path to a deployments file (for testing)")
flag.Parse()
- manifestFile := *manifestFlag
+ deploymentsFile := *deploymentsFlag
- // initialize apid using default services
apid.Initialize(factory.DefaultServicesFactory())
log := apid.Log()
log.Debug("initializing...")
- config := apid.Config()
+ configService := apid.Config()
- // if manifest is specified, start with only the manifest using a temp dir
- var manifest []byte
- if manifestFile != "" {
- var err error
- manifest, err = ioutil.ReadFile(manifestFile)
- if err != nil {
- log.Errorf("ERROR: Unable to read manifest at %s", manifestFile)
- return
- }
-
- log.Printf("Running in temp dir with manifest: %s", manifestFile)
+ var deployments apiGatewayDeploy.ApiDeploymentResponse
+ if deploymentsFile != "" {
+ log.Printf("Running in temp dir using deployments file: %s", deploymentsFile)
tmpDir, err := ioutil.TempDir("", "apidGatewayDeploy")
if err != nil {
log.Panicf("ERROR: Unable to create temp dir", err)
}
defer os.RemoveAll(tmpDir)
- config.Set("data_path", tmpDir)
- config.Set("gatewaydeploy_bundle_dir", tmpDir)
+
+ configService.Set("data_path", tmpDir)
+ configService.Set("gatewaydeploy_bundle_dir", tmpDir) // todo: legacy?
+
+ if deploymentsFile != "" {
+ bytes, err := ioutil.ReadFile(deploymentsFile)
+ if err != nil {
+ log.Errorf("ERROR: Unable to read bundle config file at %s", deploymentsFile)
+ return
+
+ }
+
+ err = json.Unmarshal(bytes, &deployments)
+ if err != nil {
+ log.Errorf("ERROR: Unable to parse deployments %v", err)
+ return
+ }
+ }
}
- // this will call all initialization functions on all registered plugins
apid.InitializePlugins()
- if manifest != nil {
- insertTestRecord(manifest)
- }
+ insertTestDeployments(deployments)
// print the base url to the console
basePath := "/deployments"
- port := config.GetString("api_port")
+ port := configService.GetString("api_port")
log.Print()
log.Printf("API is at: http://localhost:%s%s", port, basePath)
log.Print()
@@ -64,19 +68,66 @@
log.Fatalf("Error. Is something already running on port %d? %s", port, err)
}
-func insertTestRecord(manifest []byte) {
+func insertTestDeployments(deployments apiGatewayDeploy.ApiDeploymentResponse) error {
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: "deploymentID"}
- row["manifest_body"] = &common.ColumnVal{Value: string(manifest)}
-
- var event = common.Snapshot{}
- event.Tables = []common.Table{
- {
- Name: apiGatewayDeploy.MANIFEST_TABLE,
- Rows: []common.Row{row},
- },
+ if len(deployments) == 0 {
+ return nil
}
- apid.Events().Emit(apiGatewayDeploy.APIGEE_SYNC_EVENT, &event)
-}
+ log := apid.Log()
+
+ db, err := apid.Data().DB()
+ if err != nil {
+ return err
+ }
+ apiGatewayDeploy.SetDB(db)
+
+ err = apiGatewayDeploy.InitDB(db)
+ if err != nil {
+ return err
+ }
+
+ tx, err := db.Begin()
+ if err != nil {
+ return err
+ }
+
+ for _, ad := range deployments {
+
+ dep := apiGatewayDeploy.DataDeployment{
+ ID: ad.ID,
+ BundleConfigID: ad.ID,
+ ApidClusterID: ad.ID,
+ DataScopeID: ad.ScopeId,
+ BundleConfigJSON: string(ad.BundleConfigJson),
+ ConfigJSON: string(ad.ConfigJson),
+ Status: "",
+ Created: "",
+ CreatedBy: "",
+ Updated: "",
+ UpdatedBy: "",
+ BundleName: ad.DisplayName,
+ BundleURI: ad.URI,
+ BundleChecksum: "",
+ BundleChecksumType: "",
+ LocalBundleURI: ad.URI,
+ }
+
+
+ err = apiGatewayDeploy.InsertDeployment(tx, dep)
+ if err != nil {
+ log.Error("Unable to insert deployment")
+ return err
+ }
+
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ return err
+ }
+
+ apiGatewayDeploy.InitAPI()
+
+ return nil
+}
\ No newline at end of file
diff --git a/data.go b/data.go
index 0fd46b9..45ad641 100644
--- a/data.go
+++ b/data.go
@@ -2,54 +2,72 @@
import (
"database/sql"
- "time"
+ "fmt"
"github.com/30x/apid"
"sync"
)
-const (
- DEPLOYMENT_STATE_INPROG = 1
- DEPLOYMENT_STATE_ERR_APID = 2
- DEPLOYMENT_STATE_ERR_GWY = 3
- DEPLOYMENT_STATE_READY = 4
- DEPLOYMENT_STATE_SUCCESS = 5
-
- BUNDLE_TYPE_SYS = 1
- BUNDLE_TYPE_DEP = 2
-)
-
var (
unsafeDB apid.DB
dbMux sync.RWMutex
)
+type DataDeployment struct {
+ ID string
+ BundleConfigID string
+ ApidClusterID string
+ DataScopeID string
+ BundleConfigJSON string
+ ConfigJSON string
+ Status string
+ Created string
+ CreatedBy string
+ Updated string
+ UpdatedBy string
+ BundleName string
+ BundleURI string
+ BundleChecksum string
+ BundleChecksumType string
+ LocalBundleURI string
+}
+
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func initDB(db apid.DB) {
-
+func InitDB(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS gateway_deploy_deployment (
- id varchar(255), status integer, created_at integer,
- modified_at integer, error_code varchar(255),
- PRIMARY KEY (id));
-
- CREATE TABLE IF NOT EXISTS gateway_deploy_bundle (
- deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer,
- created_at integer, modified_at integer, status integer, error_code integer, error_reason text,
- PRIMARY KEY (deployment_id, id),
- FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE);
+ CREATE TABLE IF NOT EXISTS etag (
+ value integer
+ );
+ INSERT INTO etag VALUES (1);
+ CREATE TABLE IF NOT EXISTS deployments (
+ id character varying(36) NOT NULL,
+ bundle_config_id varchar(36) NOT NULL,
+ apid_cluster_id varchar(36) NOT NULL,
+ data_scope_id varchar(36) NOT NULL,
+ bundle_config_json text NOT NULL,
+ config_json text NOT NULL,
+ status text NOT NULL,
+ created timestamp without time zone,
+ created_by text,
+ updated timestamp without time zone,
+ updated_by text,
+ bundle_name text,
+ bundle_uri text,
+ local_bundle_uri text,
+ deploy_status string,
+ deploy_error_code int,
+ deploy_error_message text,
+ PRIMARY KEY (id)
+ );
`)
if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
+ return err
}
log.Debug("Database tables created.")
-}
-
-func dbTimeNow() int64 {
- return int64(time.Now().UnixNano())
+ return nil
}
func getDB() apid.DB {
@@ -59,231 +77,210 @@
return db
}
-func setDB(db apid.DB) {
+func SetDB(db apid.DB) {
dbMux.Lock()
if unsafeDB == nil { // init API when DB is initialized
- go initAPI()
+ go InitAPI()
}
unsafeDB = db
dbMux.Unlock()
}
-func insertDeployment(db apid.DB, depID string, dep deployment) error {
+// call whenever the list of deployments changes
+func incrementETag() error {
- log.Debugf("insertDeployment: %s", depID)
+ stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;")
+ if err != nil {
+ log.Errorf("prepare update etag failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
- tx, err := db.Begin()
+ _, err = stmt.Exec()
+ if err != nil {
+ log.Errorf("update etag failed: %v", err)
+ return err
+ }
+
+ log.Debugf("etag incremented")
+ return err
+}
+
+func getETag() (string, error) {
+
+ var eTag string
+ db := getDB()
+ row := db.QueryRow("SELECT value FROM etag")
+ err := row.Scan(&eTag)
+ //err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag)
+ if err != nil {
+ log.Errorf("select etag failed: %v", err)
+ return "", err
+ }
+
+ log.Debugf("etag queried: %v", eTag)
+ return eTag, err
+}
+
+func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
+
+ log.Debugf("insertDeployment: %s", dep.ID)
+
+ stmt, err := tx.Prepare(`
+ INSERT INTO deployments
+ (id, bundle_config_id, apid_cluster_id, data_scope_id,
+ bundle_config_json, config_json, status, created,
+ created_by, updated, updated_by, bundle_name,
+ bundle_uri, local_bundle_uri)
+ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
+ `)
+ if err != nil {
+ log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
+ return err
+ }
+ defer stmt.Close()
+
+ _, err = stmt.Exec(
+ dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
+ dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
+ dep.BundleURI, dep.LocalBundleURI)
+ if err != nil {
+ log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
+ return err
+ }
+
+ log.Debugf("insert into deployments %s succeeded", dep.ID)
+ return err
+}
+
+func deleteDeployment(tx *sql.Tx, depID string) error {
+
+ log.Debugf("deleteDeployment: %s", depID)
+
+ stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
+ if err != nil {
+ log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
+ return err
+ }
+ defer stmt.Close()
+
+ _, err = stmt.Exec(depID)
+ if err != nil {
+ log.Errorf("delete from deployments %s failed: %v", depID, err)
+ return err
+ }
+
+ deploymentsChanged <- depID
+
+ log.Debugf("deleteDeployment %s succeeded", depID)
+ return err
+}
+
+// getReadyDeployments() returns array of deployments that are ready to deploy
+func getReadyDeployments() (deployments []DataDeployment, err error) {
+
+ db := getDB()
+ rows, err := db.Query(`
+ SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
+ bundle_config_json, config_json, status, created,
+ created_by, updated, updated_by, bundle_name,
+ bundle_uri, local_bundle_uri
+ FROM deployments
+ WHERE local_bundle_uri != ""
+ `)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ return deployments, nil
+ }
+ log.Errorf("Error querying deployments: %v", err)
+ return
+ }
+ defer rows.Close()
+
+ for rows.Next() {
+ dep := DataDeployment{}
+ rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
+ &dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
+ &dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
+ &dep.BundleURI, &dep.LocalBundleURI,
+ )
+ deployments = append(deployments, dep)
+ }
+
+ return
+}
+
+func setDeploymentResults(results apiDeploymentResults) error {
+
+ log.Debugf("setDeploymentResults: %v", results)
+
+ tx, err := getDB().Begin()
+ if err != nil {
+ log.Errorf("Unable to begin transaction: %v", err)
+ return err
+ }
defer tx.Rollback()
+
+ stmt, err := tx.Prepare(`
+ UPDATE deployments
+ SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
+ WHERE id=$4;
+ `)
if err != nil {
- log.Errorf("insertDeployment begin transaction failed: %v", depID, err)
+ log.Errorf("prepare updateDeploymentStatus failed: %v", err)
return err
}
+ defer stmt.Close()
- timeNow := dbTimeNow()
-
- _, err = tx.Exec("INSERT INTO gateway_deploy_deployment " +
- "(id, status, created_at) VALUES(?,?,?);",
- depID, DEPLOYMENT_STATE_READY, timeNow)
- if err != nil {
- log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err)
- return err
- }
-
- // system bundle
- // todo: extra data? TBD
- _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
- "(id, deployment_id, scope, type, uri, status, created_at) " +
- "VALUES(?,?,?,?,?,?,?);",
- dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_READY, timeNow)
- if err != nil {
- log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err)
- return err
- }
-
- // todo: extra data? TBD
- for _, bun := range dep.Bundles {
- _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
- "(id, deployment_id, scope, type, uri, status, created_at) " +
- "VALUES(?,?,?,?,?,?,?);",
- bun.BundleID, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+ for _, result := range results {
+ res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
if err != nil {
- log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleID, err)
+ log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
return err
}
+ n, err := res.RowsAffected()
+ if n == 0 || err != nil {
+ log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID))
+ }
}
err = tx.Commit()
if err != nil {
- log.Errorf("commit insert to gateway_deploy_bundle %s failed: %v", depID, err)
- }
-
- log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID)
- return err
-}
-
-func updateDeploymentAndBundles(depID string, rsp deploymentResponse) error {
-
- log.Debugf("updateDeploymentAndBundles: %s", depID)
-
- db := getDB()
-
- /*
- * If the state of deployment was success, update state of bundles and
- * its deployments as success as well
- */
- txn, err := db.Begin()
- if err != nil {
- log.Errorf("Unable to begin transaction: %s", err)
- return err
- }
- defer txn.Rollback()
-
- if rsp.Status == RESPONSE_STATUS_SUCCESS {
- err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0)
- if err != nil {
- return err
- }
- err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS)
- if err != nil {
- return err
- }
- } else {
- err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.Error.ErrorCode)
- if err != nil {
- return err
- }
-
- // Iterate over Bundles, and update the errors
- for _, a := range rsp.Error.ErrorDetails {
- updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
- if err != nil {
- return err
- }
- }
- }
- if err != nil {
- return err
- }
-
- err = txn.Commit()
- if err != nil {
- log.Errorf("Unable to commit updateDeploymentStatus transaction: %s", err)
+ log.Errorf("Unable to commit setDeploymentResults transaction: %v", err)
}
return err
}
-func updateDeploymentStatus(txn SQLExec, depID string, status int, errCode int) error {
+func updateLocalURI(depID, localBundleUri string) error {
- var nRows int64
- res, err := txn.Exec("UPDATE gateway_deploy_deployment " +
- "SET status=?, modified_at=?, error_code = ? WHERE id=?;", status, dbTimeNow(), errCode, depID)
- if err == nil {
- nRows, err = res.RowsAffected()
- if nRows == 0 {
- err = sql.ErrNoRows
- }
- }
+ tx, err := getDB().Begin()
if err != nil {
- log.Errorf("UPDATE gateway_deploy_deployment %s failed: %v", depID, err)
+ log.Errorf("begin updateLocalURI failed: %v", err)
+ return err
+ }
+ defer tx.Rollback()
+
+ stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
+ if err != nil {
+ log.Errorf("prepare updateLocalURI failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
+
+ _, err = stmt.Exec(localBundleUri, depID)
+ if err != nil {
+ log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
return err
}
- log.Debugf("UPDATE gateway_deploy_deployment %s succeeded", depID)
-
- if status == DEPLOYMENT_STATE_READY {
- incoming<- depID
- }
-
- return nil
-}
-
-func updateAllBundleStatus(txn SQLExec, depID string, status int) error {
- var nRows int64
- res, err := txn.Exec("UPDATE gateway_deploy_bundle SET status = ? WHERE deployment_id = ?;", status, depID)
- if err == nil {
- nRows, err = res.RowsAffected()
- if nRows == 0 {
- err = sql.ErrNoRows
- }
- }
+ err = tx.Commit()
if err != nil {
- log.Errorf("UPDATE all gateway_deploy_bundle %s failed: %v", depID, err)
+ log.Errorf("commit updateLocalURI failed: %v", err)
return err
}
+ log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
+
return nil
}
-
-func updateBundleStatus(txn SQLExec, depID string, bundleID string, status int, errCode int, errReason string) error {
- var nRows int64
- res, err := txn.Exec("UPDATE gateway_deploy_bundle " +
- "SET status=?, error_code=?, error_reason=?, modified_at=? WHERE id=?;",
- status, errCode, errReason, dbTimeNow(), depID)
- if err == nil {
- nRows, err = res.RowsAffected()
- if nRows == 0 {
- err = sql.ErrNoRows
- }
- }
- if err != nil {
- log.Errorf("UPDATE gateway_deploy_bundle %s:%s failed: %v", depID, bundleID, err)
- return err
- }
-
- log.Debugf("UPDATE gateway_deploy_bundle success: %s:%s", depID, bundleID)
- return nil
-}
-
-// getCurrentDeploymentID returns the ID of what should be the "current" deployment
-func getCurrentDeploymentID() (string, error) {
-
- db := getDB()
- var depID string
- err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " +
- "WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID)
- log.Debugf("current deployment id: %s", depID)
- return depID, err
-}
-
-// getDeployment returns a fully populated deploymentResponse
-func getDeployment(depID string) (*deployment, error) {
-
- db := getDB()
- rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope " +
- "FROM gateway_deploy_bundle WHERE deployment_id=?;", depID)
- if err != nil {
- log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err)
- return nil, err
- }
- defer rows.Close()
-
- depRes := deployment{
- Bundles: []bundle{},
- DeploymentID: depID,
- }
-
- for rows.Next() {
- var bundleType int
- var bundleID, uri, scope string
- err = rows.Scan(&bundleID, &bundleType, &uri, &scope)
- if err != nil {
- log.Errorf("gateway_deploy_bundle fetch failed. Err: %s", err)
- return nil, err
- }
- if bundleType == BUNDLE_TYPE_SYS {
- depRes.System = bundle{
- BundleID: bundleID,
- URI: uri,
- }
- } else {
- fileUrl := getBundleFilePath(depID, uri)
- bd := bundle{
- BundleID: bundleID,
- URI: fileUrl,
- Scope: scope,
- }
- depRes.Bundles = append(depRes.Bundles, bd)
- }
- }
- return &depRes, nil
-}
diff --git a/deployments.go b/deployments.go
deleted file mode 100644
index 8fe22f4..0000000
--- a/deployments.go
+++ /dev/null
@@ -1,261 +0,0 @@
-package apiGatewayDeploy
-
-import (
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "encoding/base64"
- "path"
- "errors"
- "io/ioutil"
- "time"
- "github.com/30x/apid"
-)
-
-// todo: remove downloaded bundle files from old deployments
-
-const (
- DOWNLOAD_ATTEMPTS = 3
-)
-
-var (
- downloadMultiplier = 10 * time.Second
-)
-
-type systemBundle struct {
- URI string `json:"uri"`
-}
-
-type dependantBundle struct {
- URI string `json:"uri"`
- Scope string `json:"scope"`
- Org string `json:"org"`
- Env string `json:"env"`
-}
-
-type bundleManifest struct {
- SysBun systemBundle `json:"system"`
- DepBun []dependantBundle `json:"bundles"`
-}
-
-// event bundle
-type bundle struct {
- BundleID string `json:"bundleId"`
- URI string `json:"uri"`
- Scope string `json:"scope"`
- Org string `json:"org"`
- Env string `json:"env"`
-}
-
-// event deployment
-type deployment struct {
- DeploymentID string `json:"deploymentId"`
- System bundle `json:"system"`
- Bundles []bundle `json:"bundles"`
-}
-
-type deploymentErrorDetail struct {
- ErrorCode int `json:"errorCode"`
- Reason string `json:"reason"`
- BundleID string `json:"bundleId"`
-}
-
-type deploymentErrorResponse struct {
- ErrorCode int `json:"errorCode"`
- Reason string `json:"reason"`
- ErrorDetails []deploymentErrorDetail `json:"bundleErrors"`
-}
-
-type deploymentResponse struct {
- Status string `json:"status"`
- Error deploymentErrorResponse `json:"error"`
-}
-
-// retrieveBundle retrieves bundle data from a URI
-func getBundleReader(uriString string) (io.ReadCloser, error) {
-
- uri, err := url.Parse(uriString)
- if err != nil {
- return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString)
- }
-
- // todo: Temporary blind bundle download, Bundle storage TBD
-
- // assume it's a file if no scheme
- if uri.Scheme == "" || uri.Scheme == "file" {
- f, err := os.Open(uri.Path)
- if err != nil {
- return nil, err
- }
- return f, nil
- }
-
- // some random uri, try to GET it
- res, err := http.Get(uriString)
- if err != nil {
- return nil, err
- }
- if res.StatusCode != 200 {
- return nil, fmt.Errorf("Bundle uri %s failed with status %d", uriString, res.StatusCode)
- }
- return res.Body, nil
-}
-
-// check if already exists and skip
-func prepareBundle(depID string, bun bundle) error {
-
- bundleFile := getBundleFilePath(depID, bun.URI)
- bundleDir := path.Dir(bundleFile)
-
- downloadBundle := func() (fileName string, err error) {
-
- log.Debugf("Downloading bundle: %s", bun.URI)
-
- var tempFile *os.File
- tempFile, err = ioutil.TempFile(bundleDir, "download")
- if err != nil {
- log.Errorf("Unable to create temp file: %v", err)
- return
- }
- defer tempFile.Close()
- fileName = tempFile.Name()
-
- var bundleReader io.ReadCloser
- bundleReader, err = getBundleReader(bun.URI)
- if err != nil {
- log.Errorf("Unable to retrieve bundle %s: %v", bun.URI, err)
- return
- }
- defer bundleReader.Close()
-
- _, err = io.Copy(tempFile, bundleReader)
- if err != nil {
- log.Errorf("Unable to write bundle %s: %v", tempFile, err)
- return
- }
-
- log.Debugf("Bundle downloaded: %s", bun.URI)
- return
- }
-
- // retry
- var tempFile string
- var err error
- for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
- tempFile, err = downloadBundle()
- if err == nil {
- break
- }
- if tempFile != "" {
- os.Remove(tempFile)
- }
-
- // simple back-off, we could potentially be more sophisticated
- retryIn := time.Duration(i) * downloadMultiplier
- log.Debugf("will retry download in %s", retryIn)
- time.Sleep(retryIn)
- }
-
- if err != nil {
- log.Errorf("failed %s download attempts. aborting.", DOWNLOAD_ATTEMPTS)
- return err
- }
-
- err = os.Rename(tempFile, bundleFile)
- if err != nil {
- log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
- os.Remove(tempFile)
- return err
- }
-
- return nil
-}
-
-func getDeploymentFilesPath(depID string) string {
- return path.Join(bundlePath, depID)
-}
-
-func getBundleFilePath(depID string, bundleURI string) string {
- return path.Join(getDeploymentFilesPath(depID), base64.StdEncoding.EncodeToString([]byte(bundleURI)))
-}
-
-// returns first bundle download error
-// all bundles will be attempted regardless of errors, in the future we could retry
-func prepareDeployment(db apid.DB, depID string, dep deployment) error {
-
- log.Debugf("preparing deployment: %s", depID)
-
- err := insertDeployment(db, depID, dep)
- if err != nil {
- log.Errorf("insert deployment failed: %v", err)
- return err
- }
-
- deploymentPath := getDeploymentFilesPath(depID)
- err = os.MkdirAll(deploymentPath, 0700)
- if err != nil {
- log.Errorf("Deployment dir creation failed: %v", err)
- return err
- }
-
- // download bundles and store them locally
- errorsChan := make(chan error, len(dep.Bundles))
- for i := range dep.Bundles {
- bun := dep.Bundles[i]
- go func() {
- err := prepareBundle(depID, bun)
- errorsChan<- err
- if err != nil {
- id := string(i)
- err = updateBundleStatus(db, depID, id, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO, err.Error())
- if err != nil {
- log.Errorf("Update bundle %s:%s status failed: %v", depID, id, err)
- }
- }
- }()
- }
-
- // fail fast on first error, otherwise wait for completion
- for range dep.Bundles {
- err := <-errorsChan
- if err != nil {
- updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO)
- return err
- }
- }
-
- return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
-}
-
-func parseManifest(manifestString string) (dep deployment, err error) {
- err = json.Unmarshal([]byte(manifestString), &dep)
- if err != nil {
- log.Errorf("JSON decoding Manifest failed: %v", err)
- return
- }
-
- // validate manifest
- if dep.System.URI == "" {
- err = errors.New("system bundle 'uri' is required")
- return
- }
- for _, bun := range dep.Bundles {
- if bun.BundleID == "" {
- err = errors.New("bundle 'bundleId' is required")
- return
- }
- if bun.URI == "" {
- err = errors.New("bundle 'uri' is required")
- return
- }
- if bun.Scope == "" {
- err = errors.New("bundle 'scope' is required")
- return
- }
- }
-
- return
-}
diff --git a/glide.lock b/glide.lock
deleted file mode 100644
index 04b6022..0000000
--- a/glide.lock
+++ /dev/null
@@ -1,115 +0,0 @@
-hash: fe03e1a208d3f729a210725f77887d0a77f06129d46309140d2e5e9b2513b77e
-updated: 2016-10-24T14:39:39.290139587-07:00
-imports:
-- name: github.com/30x/apid
- version: 56f87c3a7fb9909be0311b964bad009197e5cd64
- subpackages:
- - api
- - config
- - data
- - events
- - factory
- - logger
-- name: github.com/30x/apidApigeeSync
- version: e62f875f6b6c014791d93a816b7413a0cf2ec98a
-- name: github.com/fsnotify/fsnotify
- version: bd2828f9f176e52d7222e565abb2d338d3f3c103
-- name: github.com/gorilla/context
- version: 08b5f424b9271eedf6f9f0ce86cb9396ed337a42
-- name: github.com/gorilla/mux
- version: 0eeaf8392f5b04950925b8a69fe70f110fa7cbfc
-- name: github.com/hashicorp/hcl
- version: 99ce73d4fe576449f7a689d4fc2b2ad09a86bdaa
- subpackages:
- - hcl/ast
- - hcl/parser
- - hcl/scanner
- - hcl/strconv
- - hcl/token
- - json/parser
- - json/scanner
- - json/token
-- name: github.com/kr/fs
- version: 2788f0dbd16903de03cb8186e5c7d97b69ad387b
-- name: github.com/magiconair/properties
- version: 0723e352fa358f9322c938cc2dadda874e9151a9
-- name: github.com/mattn/go-sqlite3
- version: e5a3c16c5c1d80b24f633e68aecd6b0702786d3d
-- name: github.com/mitchellh/mapstructure
- version: f3009df150dadf309fdee4a54ed65c124afad715
-- name: github.com/pelletier/go-buffruneio
- version: df1e16fde7fc330a0ca68167c23bf7ed6ac31d6d
-- name: github.com/pelletier/go-toml
- version: 45932ad32dfdd20826f5671da37a5f3ce9f26a8d
-- name: github.com/pkg/errors
- version: 839d9e913e063e28dfd0e6c7b7512793e0a48be9
-- name: github.com/pkg/sftp
- version: 4d0e916071f68db74f8a73926335f809396d6b42
-- name: github.com/Sirupsen/logrus
- version: 3ec0642a7fb6488f65b06f9040adc67e3990296a
-- name: github.com/spf13/afero
- version: 52e4a6cfac46163658bd4f123c49b6ee7dc75f78
- subpackages:
- - mem
- - sftp
-- name: github.com/spf13/cast
- version: 2580bc98dc0e62908119e4737030cc2fdfc45e4c
-- name: github.com/spf13/jwalterweatherman
- version: 33c24e77fb80341fe7130ee7c594256ff08ccc46
-- name: github.com/spf13/pflag
- version: 5ccb023bc27df288a957c5e994cd44fd19619465
-- name: github.com/spf13/viper
- version: 80ab6657f9ec7e5761f6603320d3d58dfe6970f6
-- name: golang.org/x/crypto
- version: 8291fff38ab6f2928eca153913c3b5773aa0de98
- subpackages:
- - curve25519
- - ed25519
- - ed25519/internal/edwards25519
- - ssh
-- name: golang.org/x/sys
- version: c200b10b5d5e122be351b67af224adc6128af5bf
- subpackages:
- - unix
-- name: golang.org/x/text
- version: 2556e8494a202e0b4008f70eda8cdb089b03fa50
- subpackages:
- - transform
- - unicode/norm
-- name: gopkg.in/yaml.v2
- version: a5b47d31c556af34a302ce5d659e6fea44d90de0
-testImports:
-- name: github.com/30x/keymaster
- version: 9dacc49a25917fe0ed621235950a523170da9a2b
- subpackages:
- - client
-- name: github.com/onsi/ginkgo
- version: 462326b1628e124b23f42e87a8f2750e3c4e2d24
- subpackages:
- - config
- - internal/codelocation
- - internal/containernode
- - internal/failer
- - internal/leafnodes
- - internal/remote
- - internal/spec
- - internal/specrunner
- - internal/suite
- - internal/testingtproxy
- - internal/writer
- - reporters
- - reporters/stenographer
- - types
-- name: github.com/onsi/gomega
- version: a78ae492d53aad5a7a232d0d0462c14c400e3ee7
- subpackages:
- - format
- - internal/assertion
- - internal/asyncassertion
- - internal/testingtsupport
- - matchers
- - matchers/support/goraph/bipartitegraph
- - matchers/support/goraph/edge
- - matchers/support/goraph/node
- - matchers/support/goraph/util
- - types
diff --git a/glide.yaml b/glide.yaml
index 4c3f865..6255ffa 100644
--- a/glide.yaml
+++ b/glide.yaml
@@ -1,8 +1,11 @@
package: github.com/30x/apidGatewayDeploy
import:
- package: github.com/30x/apid
-- package: github.com/30x/apidApigeeSync
-- package: github.com/gorilla/mux
+ version: master
+- package: github.com/apigee-labs/transicator/common
+ version: master
testImport:
- package: github.com/onsi/ginkgo
+ version: master
- package: github.com/onsi/gomega
+ version: master
diff --git a/init.go b/init.go
index c56642f..12f1339 100644
--- a/init.go
+++ b/init.go
@@ -32,11 +32,11 @@
data = services.Data()
relativeBundlePath := config.GetString(configBundleDirKey)
- if err := os.MkdirAll(relativeBundlePath, 0700); err != nil {
- log.Panicf("Failed bundle directory creation: %v", err)
- }
storagePath := config.GetString("local_storage_path")
bundlePath = path.Join(storagePath, relativeBundlePath)
+ if err := os.MkdirAll(bundlePath, 0700); err != nil {
+ log.Panicf("Failed bundle directory creation: %v", err)
+ }
log.Infof("Bundle directory path is %s", bundlePath)
go distributeEvents()
diff --git a/listener.go b/listener.go
index 39e35c2..3b57542 100644
--- a/listener.go
+++ b/listener.go
@@ -1,19 +1,28 @@
package apiGatewayDeploy
import (
+ "database/sql"
+ "encoding/json"
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
)
const (
APIGEE_SYNC_EVENT = "ApigeeSync"
- MANIFEST_TABLE = "edgex.apid_config_manifest_deployment"
+ DEPLOYMENT_TABLE = "edgex.deployment"
)
func initListener(services apid.Services) {
services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{})
}
+type bundleConfigJson struct {
+ Name string `json:"name"`
+ URI string `json:"uri"`
+ ChecksumType string `json:"checksumType"`
+ Checksum string `json:"checksum"`
+}
+
type apigeeSyncHandler struct {
}
@@ -41,72 +50,144 @@
log.Panicf("Unable to access database: %v", err)
}
- initDB(db)
+ err = InitDB(db)
+ if err != nil {
+ log.Panicf("Unable to initialize database: %v", err)
+ }
+ tx, err := db.Begin()
+ if err != nil {
+ log.Panicf("Error starting transaction: %v", err)
+ }
+
+ defer tx.Rollback()
for _, table := range snapshot.Tables {
var err error
switch table.Name {
- case MANIFEST_TABLE:
+ case DEPLOYMENT_TABLE:
log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
if len(table.Rows) == 0 {
return
}
- // todo: should be 0 or 1 *per system*!! - TBD
- row := table.Rows[len(table.Rows)-1]
- err = processNewManifest(db, row)
+ for _, row := range table.Rows {
+ addDeployment(tx, row)
+ }
}
if err != nil {
log.Panicf("Error processing Snapshot: %v", err)
}
}
- setDB(db)
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error committing Snapshot change: %v", err)
+ }
+
+ SetDB(db)
log.Debug("Snapshot processed")
}
func processChangeList(changes *common.ChangeList) {
- db := getDB()
+ tx, err := getDB().Begin()
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+ defer tx.Rollback()
for _, change := range changes.Changes {
var err error
switch change.Table {
- case MANIFEST_TABLE:
+ case DEPLOYMENT_TABLE:
switch change.Operation {
case common.Insert:
- err = processNewManifest(db, change.NewRow)
+ err = addDeployment(tx, change.NewRow)
+ case common.Delete:
+ var id string
+ err = change.OldRow.Get("id", &id)
+ if err == nil {
+ err = deleteDeployment(tx, id)
+ // todo: delete downloaded bundle file
+ }
default:
- log.Error("unexpected operation: %s", change.Operation)
+ log.Errorf("unexpected operation: %s", change.Operation)
}
}
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
}
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
}
-func processNewManifest(db apid.DB, row common.Row) error {
+func addDeployment(tx *sql.Tx, row common.Row) (err error) {
- var deploymentID, manifestString string
- err := row.Get("id", &deploymentID)
+ d := DataDeployment{}
+ err = row.Get("id", &d.ID)
if err != nil {
- return err
+ return
}
- err = row.Get("manifest_body", &manifestString)
+ err = row.Get("bundle_config_id", &d.BundleConfigID)
if err != nil {
- return err
+ return
+ }
+ err = row.Get("apid_cluster_id", &d.ApidClusterID)
+ if err != nil {
+ return
+ }
+ err = row.Get("data_scope_id", &d.DataScopeID)
+ if err != nil {
+ return
+ }
+ err = row.Get("bundle_config_json", &d.BundleConfigJSON)
+ if err != nil {
+ return
+ }
+ err = row.Get("config_json", &d.ConfigJSON)
+ if err != nil {
+ return
+ }
+ err = row.Get("status", &d.Status)
+ if err != nil {
+ return
+ }
+ err = row.Get("created", &d.Created)
+ if err != nil {
+ return
+ }
+ err = row.Get("created_by", &d.CreatedBy)
+ if err != nil {
+ return
+ }
+ err = row.Get("updated", &d.Updated)
+ if err != nil {
+ return
+ }
+ err = row.Get("updated_by", &d.UpdatedBy)
+ if err != nil {
+ return
}
- manifest, err := parseManifest(manifestString)
+ var bc bundleConfigJson
+ err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
if err != nil {
- log.Errorf("error parsing manifest: %v", err)
- return err
+ log.Errorf("JSON decoding Manifest failed: %v", err)
+ return
}
- err = prepareDeployment(db, deploymentID, manifest)
+ d.BundleName = bc.Name
+ d.BundleURI = bc.URI
+ d.BundleChecksumType = bc.ChecksumType
+ d.BundleChecksum = bc.Checksum
+
+ err = InsertDeployment(tx, d)
if err != nil {
- log.Errorf("serviceDeploymentQueue prepare deployment failed: %s", deploymentID)
- return err
+ return
}
- return nil
+ // todo: limit # concurrent downloads?
+ go downloadBundle(d)
+ return
}
diff --git a/listener_test.go b/listener_test.go
index 7160f35..ec4b7f2 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -7,182 +7,169 @@
. "github.com/onsi/gomega"
"net/url"
"github.com/apigee-labs/transicator/common"
- "io/ioutil"
)
var _ = Describe("listener", func() {
- It("should process ApigeeSync snapshot event", func(done Done) {
-
- deploymentID := "listener_test_1"
-
- uri, err := url.Parse(testServer.URL)
+ BeforeEach(func() {
+ _, err := getDB().Exec("DELETE FROM deployments")
Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundle/1"
- bundleUri1 := uri.String()
- uri.Path = "/bundle/2"
- bundleUri2 := uri.String()
+ })
- dep := deployment{
- DeploymentID: deploymentID,
- System: bundle{
- URI: "whatever",
- },
- Bundles: []bundle{
- {
- BundleID: "/bundle/1",
- URI: bundleUri1,
- Scope: "some-scope",
+ Context("ApigeeSync snapshot event", func() {
+
+ It("should set DB and process", func(done Done) {
+
+ deploymentID := "listener_test_1"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle1 := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle1)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ var event = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{
+ {
+ Name: DEPLOYMENT_TABLE,
+ Rows: []common.Row{row},
+ },
},
- {
- BundleID: "/bundle/2",
- URI: bundleUri2,
- Scope: "some-scope",
+ }
+
+ var listener = make(chan string)
+ addSubscriber <- listener
+
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+
+ id := <-listener
+ Expect(id).To(Equal(deploymentID))
+
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle1.Name))
+ Expect(d.BundleURI).To(Equal(bundle1.URI))
+
+ close(done)
+ })
+ })
+
+ Context("ApigeeSync change event", func() {
+
+ It("add event should add a deployment", func(done Done) {
+
+ deploymentID := "add_test_1"
+
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc-32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ var event = common.ChangeList{
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: DEPLOYMENT_TABLE,
+ NewRow: row,
+ },
},
- },
- }
+ }
- depBytes, err := json.Marshal(dep)
- Expect(err).ShouldNot(HaveOccurred())
+ var listener = make(chan string)
+ addSubscriber <- listener
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)}
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- var event = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{
- {
- Name: MANIFEST_TABLE,
- Rows: []common.Row{row},
+ // wait for event to propagate
+ id := <-listener
+ Expect(id).To(Equal(deploymentID))
+
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d := deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle.Name))
+ Expect(d.BundleURI).To(Equal(bundle.URI))
+
+ close(done)
+ })
+
+ It("delete event should delete a deployment", func(done Done) {
+
+ deploymentID := "delete_test_1"
+
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ dep := DataDeployment{
+ ID: deploymentID,
+ LocalBundleURI: "whatever",
+ }
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+
+ var event = common.ChangeList{
+ Changes: []common.Change{
+ {
+ Operation: common.Delete,
+ Table: DEPLOYMENT_TABLE,
+ OldRow: row,
+ },
},
- },
- }
+ }
- h := &test_handler{
- deploymentID,
- func(e apid.Event) {
- defer GinkgoRecover()
+ var listener = make(chan string)
+ addSubscriber <- listener
- // ignore the first event, let standard listener process it
- changeSet, ok := e.(*common.Snapshot)
- if !ok || len(changeSet.Tables) > 0 {
- return
- }
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- testDeployment(dep)
+ id := <-listener
+ Expect(id).To(Equal(deploymentID))
- close(done)
- },
- }
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
- apid.Events().Listen(APIGEE_SYNC_EVENT, h)
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // for standard listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{SnapshotInfo: "test"}) // for test listener
- })
+ Expect(len(deployments)).To(Equal(0))
- It("should process ApigeeSync change event", func(done Done) {
-
- deploymentID := "listener_test_2"
-
- var dep deployment
-
- h := &test_handler{
- deploymentID,
- func(e apid.Event) {
- defer GinkgoRecover()
-
- // ignore the first event, let standard listener process it
- changeSet, ok := e.(*common.ChangeList)
- if !ok || len(changeSet.Changes) > 0 {
- return
- }
-
- testDeployment(dep)
-
- close(done)
- },
- }
-
- apid.Events().Listen(APIGEE_SYNC_EVENT, h)
-
- dep = triggerDeploymentEvent(deploymentID)
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &common.ChangeList{}) // for test listener
+ close(done)
+ })
})
})
-
-type test_handler struct {
- description string
- f func(event apid.Event)
-}
-
-func (t *test_handler) String() string {
- return t.description
-}
-
-func (t *test_handler) Handle(event apid.Event) {
- t.f(event)
-}
-
-func testDeployment(dep deployment) {
-
- depID, err := getCurrentDeploymentID()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(depID).Should(Equal(dep.DeploymentID))
-
- deployment, err := getDeployment(depID)
- Expect(deployment.Bundles).To(HaveLen(len(dep.Bundles)))
-
- for _, b := range dep.Bundles {
- bundleFile := getBundleFilePath(depID, b.URI)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(bundleFile).To(BeARegularFile())
-
- bytes, err := ioutil.ReadFile(bundleFile)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(string(bytes)).Should(Equal(b.BundleID))
- }
-}
-
-func triggerDeploymentEvent(deploymentID string) deployment {
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundle/1"
- bundleUri := uri.String()
-
- dep := deployment{
- DeploymentID: deploymentID,
- System: bundle{
- URI: bundleUri,
- },
- Bundles: []bundle{
- {
- BundleID: "/bundle/1",
- URI: bundleUri,
- Scope: "some-scope",
- },
- },
- }
-
- depBytes, err := json.Marshal(dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)}
-
- var event = common.ChangeList{}
- event.Changes = []common.Change{
- {
- Operation: common.Insert,
- Table: MANIFEST_TABLE,
- NewRow: row,
- },
- }
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
- return dep
-}