Squashed bugs, simplified code, more tests!
diff --git a/api.go b/api.go
index 7f34e74..67a98ee 100644
--- a/api.go
+++ b/api.go
@@ -8,10 +8,20 @@
"net/http"
"strconv"
"time"
+ "fmt"
)
-// todo: add error codes where this is used
-const ERROR_CODE_TODO = 0
+const (
+ RESPONSE_STATUS_SUCCESS = "SUCCESS"
+ RESPONSE_STATUS_FAIL = "FAIL"
+
+ ERROR_CODE_TODO = 0 // todo: add error codes where this is used
+)
+
+var (
+ incoming = make(chan string)
+ addSubscriber = make(chan chan string)
+)
type errorResponse struct {
ErrorCode int `json:"errorCode"`
@@ -20,10 +30,11 @@
func initAPI(services apid.Services) {
services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET")
- services.API().HandleFunc("/deployments/{deploymentID}", respHandler).Methods("POST")
+ services.API().HandleFunc("/deployments/{deploymentID}", handleDeploymentResult).Methods("POST")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
+ w.WriteHeader(status)
e := errorResponse{
ErrorCode: code,
Reason: reason,
@@ -34,21 +45,19 @@
} else {
w.Write(bytes)
}
- log.Debugf("sending (%d) error to client: %s", status, reason)
- w.WriteHeader(status)
+ log.Debugf("sending %d error to client: %s", status, reason)
}
func writeDatabaseError(w http.ResponseWriter) {
writeError(w, http.StatusInternalServerError, ERROR_CODE_TODO, "database error")
}
-// todo: The following was basically just copied from old APID - needs review.
-
func distributeEvents() {
subscribers := make(map[chan string]struct{})
for {
select {
case msg := <-incoming:
+ log.Debugf("Delivering new deployment %s to %d subscribers", msg, len(subscribers))
for subscriber := range subscribers {
select {
case subscriber <- msg:
@@ -82,11 +91,13 @@
return
}
}
+ log.Debugf("api timeout: %d", timeout)
// If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
// query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
// most recent bundle list.
priorDepID := r.Header.Get("If-None-Match")
+ log.Debugf("if-none-match: %s", priorDepID)
depID, err := getCurrentDeploymentID()
if err != nil && err != sql.ErrNoRows{
@@ -132,7 +143,11 @@
case <-time.After(time.Duration(timeout) * time.Second):
log.Debug("Blocking deployment request timed out.")
- w.WriteHeader(http.StatusNotFound)
+ if priorDepID != "" {
+ w.WriteHeader(http.StatusNotModified)
+ } else {
+ w.WriteHeader(http.StatusNotFound)
+ }
return
}
}
@@ -148,32 +163,42 @@
log.Errorf("unable to marshal deployment: %s", err)
w.WriteHeader(http.StatusInternalServerError)
} else {
+ log.Debugf("sending deployment %s: %s", depID, b)
w.Header().Set("ETag", depID)
w.Write(b)
}
}
-func respHandler(w http.ResponseWriter, r *http.Request) {
+// todo: we'll need to transmit results back to Edge somehow...
+func handleDeploymentResult(w http.ResponseWriter, r *http.Request) {
depID := apid.API().Vars(r)["deploymentID"]
if depID == "" {
log.Error("No deployment ID")
- // todo: add error code
- writeError(w, http.StatusBadRequest, 0, "Missing deployment ID")
+ writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Missing deployment ID")
return
}
- var rsp gwBundleResponse
+ var rsp deploymentResponse
buf, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(buf, &rsp)
if err != nil {
log.Error("Resp Handler Json Unmarshal err: ", err)
- // todo: add error code
- writeError(w, http.StatusBadRequest, 0, "Malformed body")
+ writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Malformed JSON")
return
}
- // todo: validate request body
+
+ if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL {
+ writeError(w, http.StatusBadRequest, ERROR_CODE_TODO,
+ fmt.Sprintf("status must be '%s' or '%s'", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL))
+ return
+ }
+
+ if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.GWbunRsp.ErrorCode == 0 || rsp.GWbunRsp.Reason == "") {
+ writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "errorCode and reason are required")
+ return
+ }
/*
* If the state of deployment was success, update state of bundles and
@@ -186,15 +211,19 @@
return
}
- var updated bool
- if rsp.Status == "SUCCESS" {
- updated = updateDeploymentSuccess(depID, txn)
+ var updateErr error
+ if rsp.Status == RESPONSE_STATUS_SUCCESS {
+ updateErr = updateDeploymentSuccess(depID, txn)
} else {
- updated = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
+ updateErr = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
}
- if !updated {
- writeDatabaseError(w)
+ if updateErr != nil {
+ if updateErr == sql.ErrNoRows {
+ writeError(w, http.StatusNotFound, ERROR_CODE_TODO, "not found")
+ } else {
+ writeDatabaseError(w)
+ }
err = txn.Rollback()
if err != nil {
log.Errorf("Unable to rollback transaction: %s", err)
@@ -211,68 +240,39 @@
return
}
-func updateDeploymentSuccess(depID string, txn *sql.Tx) bool {
+func updateDeploymentSuccess(depID string, txn *sql.Tx) error {
- log.Debugf("Marking deployment (%s) as SUCCEEDED", depID)
+ log.Debugf("Marking deployment %s as succeeded", depID)
- var rows int64
- res, err := txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ? WHERE deployment_id = ?;",
- DEPLOYMENT_STATE_SUCCESS, depID)
- if err == nil {
- rows, err = res.RowsAffected()
- }
- if err != nil || rows == 0 {
- log.Errorf("UPDATE BUNDLE_INFO Failed: Dep Id (%s): %v", depID, err)
- return false
- }
-
- log.Infof("UPDATE BUNDLE_INFO Success: Dep Id (%s)", depID)
-
- res, err = txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ? WHERE id = ?;",
- DEPLOYMENT_STATE_SUCCESS, depID)
+ err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0)
if err != nil {
- rows, err = res.RowsAffected()
- }
- if err != nil || rows == 0 {
- log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
- return false
+ return err
}
- log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
+ err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS)
+ if err != nil {
+ return err
+ }
- return true
-
+ return nil
}
-func updateDeploymentFailure(depID string, rsp gwBundleErrorResponse, txn *sql.Tx) bool {
+func updateDeploymentFailure(depID string, rsp deploymentErrorResponse, txn *sql.Tx) error {
- log.Infof("marking deployment (%s) as FAILED", depID)
+ log.Infof("marking deployment %s as FAILED", depID)
- var rows int64
- /* Update the Deployment state errors */
- res, err := txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ?, error_code = ? WHERE id = ?;",
- DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode, depID)
- if err == nil {
- rows, err = res.RowsAffected()
+ err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode)
+ if err != nil {
+ return err
}
- if err != nil || rows == 0 {
- log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
- return false
- }
- log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
- /* Iterate over Bundles, and update the errors */
+ // Iterate over Bundles, and update the errors
for _, a := range rsp.ErrorDetails {
- res, err = txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ?, errorcode = ?, error_reason = ? "+
- "WHERE id = ?;", DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason, a.BundleId)
+ updateBundleStatus(txn, depID, a.BundleId, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
if err != nil {
- rows, err = res.RowsAffected()
+ return err
}
- if err != nil || rows == 0 {
- log.Errorf("UPDATE BUNDLE_INFO Failed: Bund Id (%s): %v", a.BundleId, err)
- return false
- }
- log.Infof("UPDATE BUNDLE_INFO Success: Bund Id (%s)", a.BundleId)
}
- return true
+
+ return err
}
diff --git a/api_test.go b/api_test.go
index afbbac9..142cd05 100644
--- a/api_test.go
+++ b/api_test.go
@@ -4,8 +4,6 @@
"bytes"
"encoding/json"
"fmt"
- . "github.com/30x/apidApigeeSync" // for direct access to Payload types
- "github.com/30x/keymaster/client"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io/ioutil"
@@ -15,155 +13,272 @@
"time"
)
-const currentDeploymentPath = "/deployments/current"
-
var _ = Describe("api", func() {
- PIt("should deliver deployment events to long-poll waiters")
+ Context("GET /deployments/current", func() {
- It("should get current deployment", func() {
+ It("should get 404 if no deployments", func() {
- deploymentID := "api_test_1"
- insertTestDeployment(testServer, deploymentID)
+ _, err := db.Exec("DELETE FROM gateway_deploy_deployment")
+ Expect(err).ShouldNot(HaveOccurred())
- var deployStatus int
- err := db.QueryRow("SELECT deploy_status from BUNDLE_INFO WHERE deployment_id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_READY))
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
- err = db.QueryRow("SELECT deploy_status from BUNDLE_DEPLOYMENT WHERE id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_READY))
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
+ })
- uri, err := url.Parse(testServer.URL)
- uri.Path = currentDeploymentPath
+ It("should get current deployment", func() {
- res, err := http.Get(uri.String())
- defer res.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
+ deploymentID := "api_get_current"
+ insertTestDeployment(testServer, deploymentID)
- var depRes deploymentResponse
- body, err := ioutil.ReadAll(res.Body)
- Expect(err).ShouldNot(HaveOccurred())
- json.Unmarshal(body, &depRes)
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
- Expect(depRes.DeploymentId).Should(Equal(deploymentID))
- Expect(res.Header.Get("etag")).Should(Equal(deploymentID))
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ var depRes deployment
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ Expect(depRes.DeploymentId).Should(Equal(deploymentID))
+ Expect(res.Header.Get("etag")).Should(Equal(deploymentID))
+ })
+
+ It("should get 304 for no change", func() {
+
+ deploymentID := "api_no_change"
+ insertTestDeployment(testServer, deploymentID)
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", res.Header.Get("etag"))
+
+ res, err = http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusNotModified))
+ })
+
+ It("should get 404 after blocking if no deployment", func() {
+
+ _, err := db.Exec("DELETE FROM gateway_deploy_deployment")
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
+
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
+ })
+
+ It("should get new deployment after blocking", func(done Done) {
+
+ deploymentID := "api_get_current_blocking"
+ insertTestDeployment(testServer, deploymentID)
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ deploymentID = "api_get_current_blocking2"
+ go func() {
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", res.Header.Get("etag"))
+
+ res, err := http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusOK))
+
+ var depRes deployment
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).ShouldNot(HaveOccurred())
+ json.Unmarshal(body, &depRes)
+
+ Expect(depRes.DeploymentId).Should(Equal(deploymentID))
+
+ close(done)
+ }()
+
+ time.Sleep(50 * time.Millisecond) // make sure API call is made and blocks
+ insertTestDeployment(testServer, deploymentID)
+ incoming <- deploymentID
+ })
+
+ It("should get 304 after blocking if no new deployment", func() {
+
+ deploymentID := "api_no_change_blocking"
+ insertTestDeployment(testServer, deploymentID)
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = "/deployments/current"
+ res, err := http.Get(uri.String())
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+
+ query := uri.Query()
+ query.Add("block", "1")
+ uri.RawQuery = query.Encode()
+ req, err := http.NewRequest("GET", uri.String(), nil)
+ req.Header.Add("Content-Type", "application/json")
+ req.Header.Add("If-None-Match", res.Header.Get("etag"))
+
+ res, err = http.DefaultClient.Do(req)
+ Expect(err).ShouldNot(HaveOccurred())
+ defer res.Body.Close()
+ Expect(res.StatusCode).To(Equal(http.StatusNotModified))
+ })
})
- It("should mark a deployment as deployed", func() {
+ Context("POST /deployments/{ID}", func() {
- deploymentID := "api_test_2"
- insertTestDeployment(testServer, deploymentID)
+ It("should return a 404 for missing deployment", func() {
- uri, err := url.Parse(testServer.URL)
- uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+ deploymentID := "api_missing_deployment"
- deploymentResult := &client.DeploymentResult{
- ID: deploymentID,
- Status: client.StatusSuccess,
- }
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
- payload, err := json.Marshal(deploymentResult)
- Expect(err).ShouldNot(HaveOccurred())
+ deploymentResult := deploymentResponse{
+ Status: RESPONSE_STATUS_SUCCESS,
+ }
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
+ payload, err := json.Marshal(deploymentResult)
+ Expect(err).ShouldNot(HaveOccurred())
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusOK))
+ req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req.Header.Add("Content-Type", "application/json")
- var deployStatus int
- err = db.QueryRow("SELECT deploy_status from BUNDLE_INFO WHERE deployment_id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
+ resp, err := http.DefaultClient.Do(req)
+ defer resp.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp.StatusCode).Should(Equal(http.StatusNotFound))
+ })
- err = db.QueryRow("SELECT deploy_status from BUNDLE_DEPLOYMENT WHERE id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
+ It("should mark a deployment as deployed", func() {
+
+ deploymentID := "api_mark_deployed"
+ insertTestDeployment(testServer, deploymentID)
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+
+ deploymentResult := deploymentResponse{
+ Status: RESPONSE_STATUS_SUCCESS,
+ }
+
+ payload, err := json.Marshal(deploymentResult)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req.Header.Add("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ defer resp.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp.StatusCode).Should(Equal(http.StatusOK))
+
+ var deployStatus int
+ err = db.QueryRow("SELECT status FROM gateway_deploy_deployment WHERE id=?", deploymentID).
+ Scan(&deployStatus)
+ Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
+
+ rows, err := db.Query("SELECT status from gateway_deploy_bundle WHERE id = ?;", deploymentID)
+ Expect(err).ShouldNot(HaveOccurred())
+ for rows.Next() {
+ rows.Scan(&deployStatus)
+ Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_SUCCESS))
+ }
+ })
+
+ It("should mark a deployment as failed", func() {
+
+ deploymentID := "api_test_3"
+ insertTestDeployment(testServer, deploymentID)
+
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
+
+ deploymentResult := deploymentResponse{
+ Status: RESPONSE_STATUS_FAIL,
+ GWbunRsp: deploymentErrorResponse{
+ ErrorCode: 100,
+ Reason: "bad juju",
+ //ErrorDetails: []deploymentErrorDetail{ // todo: add tests for bundle errors
+ // {
+ // BundleId: "",
+ // ErrorCode: 100,
+ // Reason: "Zombies",
+ // },
+ //},
+ },
+ }
+
+ payload, err := json.Marshal(deploymentResult)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+ req.Header.Add("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ defer resp.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(resp.StatusCode).Should(Equal(http.StatusOK))
+
+ var deployStatus int
+ err = db.QueryRow("SELECT status from gateway_deploy_deployment WHERE id = ?;",
+ deploymentID).Scan(&deployStatus)
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_ERR_GWY))
+ })
})
-
- It("should mark a deployment as failed", func() {
-
- deploymentID := "api_test_3"
- insertTestDeployment(testServer, deploymentID)
-
- uri, err := url.Parse(testServer.URL)
- uri.Path = fmt.Sprintf("/deployments/%s", deploymentID)
-
- deploymentResult := &client.DeploymentResult{
- ID: deploymentID,
- Status: client.StatusFail,
- Error: &client.DeploymentError{
- ErrorCode: 100,
- Reason: "bad juju",
- //BundleErrors: []client.BundleError{ // todo: add tests for bundle errors
- // {
- // BundleID: "",
- // ErrorCode: 100,
- // Reason: "zombies",
- // },
- //},
- },
- }
-
- payload, err := json.Marshal(deploymentResult)
- Expect(err).ShouldNot(HaveOccurred())
-
- req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
- req.Header.Add("Content-Type", "application/json")
-
- resp, err := http.DefaultClient.Do(req)
- defer resp.Body.Close()
- Expect(err).ShouldNot(HaveOccurred())
- Expect(resp.StatusCode).Should(Equal(http.StatusOK))
-
- var deployStatus int
- err = db.QueryRow("SELECT deploy_status from BUNDLE_DEPLOYMENT WHERE id = ?;",
- deploymentID).Scan(&deployStatus)
- Expect(err).ShouldNot(HaveOccurred())
- Expect(deployStatus).Should(Equal(DEPLOYMENT_STATE_ERR_GWY))
- })
-
})
-func insertTestDeployment(server *httptest.Server, entityID string) {
+func insertTestDeployment(server *httptest.Server, depID string) {
uri, err := url.Parse(server.URL)
Expect(err).ShouldNot(HaveOccurred())
uri.Path = "/bundle"
bundleUri := uri.String()
- bundle := bundleManifest{
+ manifest := bundleManifest{
systemBundle{
bundleUri,
},
[]dependantBundle{
{
bundleUri,
- "someorg",
- "someenv",
+ "some-scope",
},
},
}
- bundleBytes, err := json.Marshal(bundle)
+
+ err = insertDeployment(depID, manifest)
Expect(err).ShouldNot(HaveOccurred())
- payload := DataPayload{
- EntityType: "deployment",
- Operation: "create",
- EntityIdentifier: entityID,
- PldCont: Payload{
- CreatedAt: time.Now().Unix(),
- Manifest: string(bundleBytes),
- },
- }
-
- insertDeployment(payload)
+ err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
+ Expect(err).ShouldNot(HaveOccurred())
}
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 48a6d69..34d8c40 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -28,6 +28,7 @@
config.SetDefault("apigeesync_organization", "X")
config.SetDefault("apigeesync_consumer_key", "X")
config.SetDefault("apigeesync_consumer_secret", "X")
+ config.SetDefault("apigeesync_log_level", "panic")
var err error
tmpDir, err = ioutil.TempDir("", "api_test")
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
index f972fad..a8b11ef 100644
--- a/cmd/apidGatewayDeploy/main.go
+++ b/cmd/apidGatewayDeploy/main.go
@@ -28,6 +28,7 @@
config.SetDefault("apigeesync_organization", "X")
config.SetDefault("apigeesync_consumer_key", "X")
config.SetDefault("apigeesync_consumer_secret", "X")
+ config.SetDefault("apigeesync_log_level", "panic")
// if manifest is specified, start with only the manifest using a temp dir
var manifest []byte
diff --git a/data.go b/data.go
new file mode 100644
index 0000000..c3fb74b
--- /dev/null
+++ b/data.go
@@ -0,0 +1,314 @@
+package apiGatewayDeploy
+
+import (
+ "database/sql"
+ "github.com/30x/apidApigeeSync"
+ "time"
+)
+
+const (
+ DEPLOYMENT_STATE_INPROG = 1
+ DEPLOYMENT_STATE_ERR_APID = 2
+ DEPLOYMENT_STATE_ERR_GWY = 3
+ DEPLOYMENT_STATE_READY = 4
+ DEPLOYMENT_STATE_SUCCESS = 5
+
+ BUNDLE_TYPE_SYS = 1
+ BUNDLE_TYPE_DEP = 2
+)
+
+/*
+Startup flow:
+ Check deployment queue
+ If anything in queue, initiate deployment retrieval
+Listener flow:
+ Receive deployment event
+ Store deployment event in deployment queue
+ Initiate deployment retrieval
+Deployment Retrieval:
+ Load deployment from deployment queue
+ Retrieve and store each bundle
+ Mark deployment as ready to deploy
+ Trigger deployment
+Deployment:
+
+Tables:
+ gateway_deploy_queue
+ Deployment(s) received and not yet processed (potentially a Queue - one for now)
+ gateway_deploy_deployment
+ gateway_deploy_bundle
+ */
+
+
+type SQLExec interface {
+ Exec(query string, args ...interface{}) (sql.Result, error)
+}
+
+// todo: should we have some kind of deployment log instead of just a status?
+
+func initDB() {
+
+ var count int
+ row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='gateway_deploy_queue';")
+ if err := row.Scan(&count); err != nil {
+ log.Panicf("Unable to check for tables: %v", err)
+ }
+ if count > 0 {
+ return
+ }
+
+ tx, err := db.Begin()
+ if err != nil {
+ log.Panicf("Unable to start transaction: %v", err)
+ }
+ defer tx.Rollback()
+ _, err = tx.Exec("CREATE TABLE gateway_deploy_queue (" +
+ "id varchar(255), manifest text, created_at integer, " +
+ "PRIMARY KEY (id));")
+ if err != nil {
+ log.Panicf("Unable to initialize gateway_deploy_queue: %v", err)
+ }
+
+ _, err = tx.Exec("CREATE TABLE gateway_deploy_deployment (" +
+ "id varchar(255), status integer, created_at integer, " +
+ "modified_at integer, error_code varchar(255), " +
+ "PRIMARY KEY (id));")
+ if err != nil {
+ log.Panicf("Unable to initialize gateway_deploy_deployment: %v", err)
+ }
+
+ _, err = tx.Exec("CREATE TABLE gateway_deploy_bundle (" +
+ "deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer, " +
+ "created_at integer, modified_at integer, status integer, error_code integer, error_reason text, " +
+ "PRIMARY KEY (deployment_id, id), " +
+ "FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE);")
+ if err != nil {
+ log.Panicf("Unable to initialize gateway_deploy_bundle: %v", err)
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Unable to commit transaction: %v", err)
+ }
+}
+
+// currently only maintains 1 in the queue
+func queueDeployment(payload apidApigeeSync.DataPayload) error {
+
+ // todo: validate payload manifest
+
+ // maintains queue at 1
+ tx, err := db.Begin()
+ if err != nil {
+ log.Debugf("INSERT gateway_deploy_queue failed: (%s)", payload.EntityIdentifier)
+ return err
+ }
+ defer tx.Rollback()
+
+ _, err = tx.Exec("DELETE FROM gateway_deploy_queue");
+ if err != nil {
+ log.Errorf("DELETE FROM gateway_deploy_queue failed: %v", err)
+ return err
+ }
+
+ _, err = tx.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);",
+ payload.EntityIdentifier,
+ payload.PldCont.Manifest,
+ payload.PldCont.CreatedAt,
+ )
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_queue %s failed: %v", payload.EntityIdentifier, err)
+ return err
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_queue %s failed: %v", payload.EntityIdentifier, err)
+ return err
+ }
+
+ log.Debugf("INSERT gateway_deploy_queue success: (%s)", payload.EntityIdentifier)
+
+ return nil
+}
+
+func getQueuedDeployment() (depID, manifestString string) {
+ err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;").
+ Scan(&depID, &manifestString)
+ if err != nil {
+ if err == sql.ErrNoRows {
+ log.Info("No Deployments available to be processed")
+ } else {
+ // todo: panic?
+ log.Errorf("SELECT on BUNDLE_DEPLOYMENT failed with Err: %s", err)
+ }
+ }
+ return
+}
+
+func dequeueDeployment(depID string) error {
+ _, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID)
+ return err
+}
+
+func dbTimeNow() int64 {
+ return int64(time.Now().UnixNano())
+}
+
+func insertDeployment(depID string, manifest bundleManifest) error {
+
+ tx, err := db.Begin()
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err)
+ return err
+ }
+ defer tx.Rollback()
+
+ timeNow := dbTimeNow()
+
+ _, err = tx.Exec("INSERT INTO gateway_deploy_deployment " +
+ "(id, status, created_at) VALUES(?,?,?);",
+ depID, DEPLOYMENT_STATE_INPROG, timeNow)
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err)
+ return err
+ }
+
+ // system bundle
+ // todo: extra data?
+ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
+ "(id, deployment_id, type, uri, status, created_at) " +
+ "VALUES(?,?,?,?,?,?);",
+ "sys", depID, BUNDLE_TYPE_SYS, manifest.SysBun.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, "sys", err)
+ return err
+ }
+
+ // todo: extra data?
+ for i, bun := range manifest.DepBun {
+ id := string(i)
+ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
+ "(id, deployment_id, scope, type, uri, status, created_at) " +
+ "VALUES(?,?,?,?,?,?,?);",
+ id, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, id, err)
+ return err
+ }
+ }
+
+ log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID)
+
+ err = tx.Commit()
+ if err != nil {
+ log.Errorf("INSERT gateway_deploy_bundle %s failed: %v", depID)
+ }
+
+ return err
+}
+
+func updateDeploymentStatus(txn SQLExec, depID string, status int, errCode int) error {
+ var nRows int64
+ res, err := txn.Exec("UPDATE gateway_deploy_deployment " +
+ "SET status=?, modified_at=?, error_code = ? WHERE id=?;", status, dbTimeNow(), errCode, depID)
+ if err == nil {
+ nRows, err = res.RowsAffected()
+ if nRows == 0 {
+ err = sql.ErrNoRows
+ }
+ }
+ if err != nil {
+ log.Errorf("UPDATE gateway_deploy_deployment %s failed: %v", depID, err)
+ return err
+ }
+
+ log.Debugf("UPDATE gateway_deploy_deployment %s succeeded", depID)
+ return nil
+}
+
+func updateAllBundleStatus(txn SQLExec, depID string, status int) error {
+ var nRows int64
+ res, err := txn.Exec("UPDATE gateway_deploy_bundle SET status = ? WHERE deployment_id = ?;", status, depID)
+ if err == nil {
+ nRows, err = res.RowsAffected()
+ if nRows == 0 {
+ err = sql.ErrNoRows
+ }
+ }
+ if err != nil {
+ log.Errorf("UPDATE all gateway_deploy_bundle %s failed: %v", depID, err)
+ return err
+ }
+
+ return nil
+}
+
+func updateBundleStatus(txn SQLExec, depID string, bundleID string, status int, errCode int, errReason string) error {
+ var nRows int64
+ res, err := txn.Exec("UPDATE gateway_deploy_bundle " +
+ "SET status=?, error_code=?, error_reason=?, modified_at=? WHERE id=?;",
+ status, errCode, errReason, dbTimeNow(), depID)
+ if err == nil {
+ nRows, err = res.RowsAffected()
+ if nRows == 0 {
+ err = sql.ErrNoRows
+ }
+ }
+ if err != nil {
+ log.Error("UPDATE gateway_deploy_bundle %s:%s failed: %v", depID, bundleID, err)
+ return err
+ }
+
+ log.Debugf("UPDATE gateway_deploy_bundle success: %s:%s", depID, bundleID)
+ return nil
+}
+
+// getCurrentDeploymentID returns the ID of what should be the "current" deployment
+func getCurrentDeploymentID() (string, error) {
+ var depID string
+ err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " +
+ "WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID)
+ log.Debugf("current deployment id: %s", depID)
+ return depID, err
+}
+
+// getDeployment returns a fully populated deploymentResponse
+func getDeployment(depID string) (*deployment, error) {
+
+ rows, err := db.Query("SELECT id, type, uri FROM gateway_deploy_bundle WHERE deployment_id=?;", depID)
+ if err != nil {
+ log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err)
+ return nil, err
+ }
+
+ depRes := deployment{
+ Bundles: []bundle{},
+ DeploymentId: depID,
+ }
+
+ for rows.Next() {
+ var bundleType int
+ var bundleID, uri string
+ err = rows.Scan(&bundleID, &bundleType, &uri)
+ if err != nil {
+ log.Errorf("gateway_deploy_bundle fetch failed. Err: %s", err)
+ return nil, err
+ }
+ if bundleType == BUNDLE_TYPE_SYS {
+ depRes.System = bundle{
+ BundleId: bundleID,
+ URI: uri,
+ }
+ } else {
+ fileUrl := getBundleFilePath(depID, uri)
+ bd := bundle{
+ AuthCode: bundleID, // todo: authCode?
+ BundleId: bundleID,
+ URI: fileUrl,
+ }
+ depRes.Bundles = append(depRes.Bundles, bd)
+ }
+ }
+ return &depRes, nil
+}
diff --git a/deployments.go b/deployments.go
index daf104d..90c2068 100644
--- a/deployments.go
+++ b/deployments.go
@@ -1,7 +1,6 @@
package apiGatewayDeploy
import (
- "database/sql"
"encoding/json"
"fmt"
"github.com/30x/apidGatewayDeploy/github"
@@ -9,37 +8,22 @@
"net/http"
"net/url"
"os"
- "strconv"
- "time"
+ "encoding/base64"
+ "path"
)
-// todo: The following was basically just copied from old APID - needs review.
-
-// todo: /current should return latest (regardless of status) if no ETag
-
-const DEPLOYMENT_STATE_UNUSED = 0
-const DEPLOYMENT_STATE_INPROG = 1
-const DEPLOYMENT_STATE_READY = 2
-const DEPLOYMENT_STATE_SUCCESS = 3
-const DEPLOYMENT_STATE_ERR_APID = 4
-const DEPLOYMENT_STATE_ERR_GWY = 5
-
var (
- bundlePathAbs string
- gitHubAccessToken string
-
- incoming = make(chan string)
- addSubscriber = make(chan chan string)
+ bundlePath string
+ gitHubAccessToken string // todo: temporary - should come from Manifest
)
type systemBundle struct {
- Uri string `json:"uri"`
+ URI string `json:"uri"`
}
type dependantBundle struct {
- Uri string `json:"uri"`
- Org string `json:"org"`
- Env string `json:"env"`
+ URI string `json:"uri"`
+ Scope string `json:"scope"`
}
type bundleManifest struct {
@@ -49,35 +33,35 @@
type bundle struct {
BundleId string `json:"bundleId"`
- URL string `json:"url"`
+ URI string `json:"uri"`
AuthCode string `json:"authCode,omitempty"`
}
-type deploymentResponse struct {
+type deployment struct {
DeploymentId string `json:"deploymentId"`
Bundles []bundle `json:"bundles"`
System bundle `json:"system"`
}
-type gwBundleErrorDetail struct {
+type deploymentErrorDetail struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
BundleId string `json:"bundleId"`
}
-type gwBundleErrorResponse struct {
- ErrorCode int `json:"errorCode"`
- Reason string `json:"reason"`
- ErrorDetails []gwBundleErrorDetail `json:"bundleErrors"`
+type deploymentErrorResponse struct {
+ ErrorCode int `json:"errorCode"`
+ Reason string `json:"reason"`
+ ErrorDetails []deploymentErrorDetail `json:"bundleErrors"`
}
-type gwBundleResponse struct {
- Status string `json:"status"`
- GWbunRsp gwBundleErrorResponse `json:"error"`
+type deploymentResponse struct {
+ Status string `json:"status"`
+ GWbunRsp deploymentErrorResponse `json:"error"`
}
-// getBundleResourceData retrieves bundle data from a bundle repo and returns a ReadCloser.
-func getBundleResourceData(uriString string) (io.ReadCloser, error) {
+// retrieveBundle retrieves bundle data from a URI
+func retrieveBundle(uriString string) (io.ReadCloser, error) {
uri, err := url.Parse(uriString)
if err != nil {
@@ -108,241 +92,116 @@
return github.GetUrlData(uri, gitHubAccessToken)
}
-func createBundle(depPath string, uri string, depid string, org string, env string, typ string, txn *sql.Tx) int {
+// todo: retry on error?
+// check if already exists and skip
+func prepareBundle(depID string, bun dependantBundle) error {
- status := DEPLOYMENT_STATE_INPROG
- ts := int64(time.Now().UnixNano())
- timeString := strconv.FormatInt(ts, 10)
-
- var bundleID string
- if typ == "sys" {
- bundleID = typ + "_" + timeString
- } else {
- // todo: stop using org and env
- bundleID = typ + "_" + org + "_" + env + "_" + timeString
- }
- locFile := depPath + "/" + bundleID + ".zip"
-
- var bundleData io.ReadCloser
- out, err := os.Create(locFile)
+ bundleFile := getBundleFilePath(depID, bun.URI)
+ out, err := os.Create(bundleFile)
if err != nil {
- log.Error("Unable to create Bundle file ", locFile, " Err: ", err)
- status = DEPLOYMENT_STATE_ERR_APID
- goto FA
+ log.Errorf("Unable to create bundle file %s, Err: %s", bundleFile, err)
+ return err
}
+ defer out.Close()
- bundleData, err = getBundleResourceData(uri)
+ bundleData, err := retrieveBundle(bun.URI)
if err != nil {
- log.Error("Unable to read Bundle URI ", uri, " Err: ", err)
- status = DEPLOYMENT_STATE_ERR_APID
- goto FA
+ log.Errorf("Unable to retrieve bundle %s, Err: %s", bun.URI, err)
+ return err
}
defer bundleData.Close()
- io.Copy(out, bundleData)
- out.Close()
-FA:
- locFile = "file://" + locFile
- success := createInitBundleDB(locFile, bundleID, ts, env, org, depid,
- typ, locFile, status, txn)
-
- if !success {
- return -1
- } else if status == DEPLOYMENT_STATE_ERR_APID {
- return 1
- } else {
- return 0
+ _, err = io.Copy(out, bundleData)
+ if err != nil {
+ log.Errorf("Unable to write bundle %s, Err: %s", bundleFile, err)
+ return err
}
+
+ return nil
}
-func orchestrateDeploymentAndTrigger() {
-
- depId := orchestrateDeployment()
- if depId != "" {
- incoming <- depId
- }
+func getDeploymentFilesPath(depID string) string {
+ return bundlePath + "/" + depID
}
-/*
- * The series of actions to be taken here are :-
- * (1) Find the latest Deployment id that is in Init state
- * (2) Parse the Manifest URL
- * (3) Download the system bundle and store locally, update DB
- * (4) Download the dependent bundle and store locally, update DB
- * (5) Update deployment state based on the status of deployment
- // returns deploymentID
-*/
-func orchestrateDeployment() string {
-
- /* (1) Find the latest deployment, if none - get out */
- status := DEPLOYMENT_STATE_READY
- txn, _ := db.Begin()
-
- var manifestString, deploymentID string
- err := db.QueryRow("SELECT id, manifest FROM BUNDLE_DEPLOYMENT WHERE deploy_status = ? "+
- "ORDER BY created_at ASC LIMIT 1;", DEPLOYMENT_STATE_UNUSED).
- Scan(&deploymentID, &manifestString)
-
- switch {
- case err == sql.ErrNoRows:
- log.Error("No Deployments available to be processed")
- return ""
- case err != nil:
- log.Error("SELECT on BUNDLE_DEPLOYMENT failed with Err: ", err)
- return ""
- }
-
- /* (2) Parse Manifest */
- var bf bundleManifest
- var fileInfo os.FileInfo
- var deploymentPath string
- var res int
- var result bool
- err = json.Unmarshal([]byte(manifestString), &bf)
- if err != nil {
- log.Error("JSON decoding Manifest failed Err: ", err)
- status = DEPLOYMENT_STATE_ERR_APID
- goto EB
- }
-
- // todo: validate bundle!
- //for bun := range bf.DepBun {
- // if bun.uri
- //}
-
- fileInfo, err = os.Stat(bundlePathAbs)
- if err != nil || !fileInfo.IsDir() {
- log.Error("Path ", bundlePathAbs, " is not a directory")
- status = DEPLOYMENT_STATE_ERR_APID
- goto EB
- }
-
- deploymentPath = bundlePathAbs + "/" + deploymentID
- err = os.Mkdir(deploymentPath, 0700)
- if err != nil {
- log.Errorf("Deployment Dir creation error: %v", err)
- status = DEPLOYMENT_STATE_ERR_APID
- goto EB
- }
-
- /* (3) Download system bundle and store locally */
- res = createBundle(deploymentPath, bf.SysBun.Uri, deploymentID, "", "", "sys", txn)
- if res == -1 {
- log.Error("Abort Txn: Unable to update DB with system bundle info")
- goto EC
- } else if res == 1 {
- status = DEPLOYMENT_STATE_ERR_APID
- }
-
- /* (4) Loop through the Dependent bundles and store them locally as well */
- for _, ele := range bf.DepBun {
- res = createBundle(deploymentPath, ele.Uri, deploymentID, ele.Org, ele.Env, "dep", txn)
- if res == -1 {
- log.Error("Abort Txn: Unable to update DB with dependent bundle info")
- goto EC
- } else if res == 1 {
- status = DEPLOYMENT_STATE_ERR_APID
- }
- }
-EB:
- if status == DEPLOYMENT_STATE_ERR_APID && deploymentPath != "" {
- os.RemoveAll(deploymentPath)
- }
- /* (5) Update Deployment state accordingly */
- result = updateDeployStatusDB(deploymentID, status, txn)
- if result == false {
- log.Error("Abort Txn: Unable to update DB with Deployment status")
- goto EC
- }
- txn.Commit()
- return deploymentID
-EC:
- os.RemoveAll(deploymentPath)
- txn.Rollback()
- return ""
+func getBundleFilePath(depID string, bundleURI string) string {
+ return path.Join(getDeploymentFilesPath(depID), base64.StdEncoding.EncodeToString([]byte(bundleURI)))
}
-/*
- * Create Init Bundle (FIXME : Put this in a struct and pass - too many i/p args)
- */
-func createInitBundleDB(fileurl string, id string, cts int64, env string, org string, depid string, typ string, loc string, status int, txn *sql.Tx) bool {
+// returns first bundle download error
+// all bundles will be attempted regardless of errors, in the future we could retry
+func prepareDeployment(depID string, manifest bundleManifest) error {
- _, err := txn.Exec("INSERT INTO BUNDLE_INFO (id, deployment_id, org, env, url, type, deploy_status, "+
- "created_at, file_url)VALUES(?,?,?,?,?,?,?,?,?);", id, depid, org, env, loc, typ, status, cts, fileurl)
-
+ deploymentPath := getDeploymentFilesPath(depID)
+ err := os.Mkdir(deploymentPath, 0700)
if err != nil {
- log.Error("INSERT BUNDLE_INFO Failed (id, dep id) : (", id, ", ", depid, ")", err)
- return false
- } else {
- log.Info("INSERT BUNDLE_INFO Success: (", id, ")")
- return true
+ log.Errorf("Deployment dir creation failed: %v", err)
+ return err
}
-}
+ // todo: any reason to put all this in a single transaction?
-func updateDeployStatusDB(id string, status int, txn *sql.Tx) bool {
-
- _, err := txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ? WHERE deployment_id = ?;", status, id)
+ err = insertDeployment(depID, manifest)
if err != nil {
- log.Error("UPDATE BUNDLE_INFO Failed: (", id, ") : ", err)
- return false
- } else {
- log.Info("UPDATE BUNDLE_INFO Success: (", id, ")")
+ log.Errorf("Prepare deployment failed: %v", err)
+ return err
}
- _, err = txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ? WHERE id = ?;", status, id)
- if err != nil {
- log.Error("UPDATE BUNDLE_DEPLOYMENT Failed: (", id, ") : ", err)
- return false
- } else {
- log.Info("UPDATE BUNDLE_DEPLOYMENT Success: (", id, ")")
- }
- return true
-
-}
-
-// getCurrentDeploymentID returns the ID of what should be the "current" deployment
-func getCurrentDeploymentID() (string, error) {
- var depID string
- err := db.QueryRow("SELECT id FROM BUNDLE_DEPLOYMENT ORDER BY created_at ASC LIMIT 1;").Scan(&depID)
- return depID, err
-}
-
-
-// getDeployment returns a fully populated deploymentResponse
-func getDeployment(depID string) (*deploymentResponse, error) {
-
- rows, err := db.Query("SELECT file_url, id, type FROM BUNDLE_INFO WHERE deployment_id = ?;", depID)
- if err != nil {
- log.Errorf("Unable to query BUNDLE_INFO. Err: %s", err)
- return nil, err
+ // download bundles and store them locally
+ errors := make(chan error, len(manifest.DepBun))
+ for i, bun := range manifest.DepBun {
+ go func() {
+ err := prepareBundle(depID, bun)
+ errors <- err
+ if err != nil {
+ id := string(i)
+ err = updateBundleStatus(db, depID, id, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO, err.Error())
+ if err != nil {
+ log.Errorf("Update bundle %s:%s status failed: %v", depID, id, err)
+ }
+ }
+ }()
}
- depRes := deploymentResponse{
- Bundles: []bundle{},
- DeploymentId: depID,
- }
-
- for rows.Next() {
- var bundleID, fileUrl, bundleType string
- err = rows.Scan(&fileUrl, &bundleID, &bundleType)
+ // fail fast on first error, otherwise wait for completion
+ for range manifest.DepBun {
+ err := <- errors
if err != nil {
- log.Errorf("BUNDLE_INFO fetch failed. Err: %s", err)
- return nil, err
- }
- if bundleType == "sys" {
- depRes.System = bundle{
- BundleId: bundleID,
- URL: fileUrl,
- }
- } else {
- bd := bundle{
- AuthCode: bundleID, // todo: authCode?
- BundleId: bundleID,
- URL: fileUrl,
- }
- depRes.Bundles = append(depRes.Bundles, bd)
+ updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO)
+ return err
}
}
- return &depRes, nil
+
+ return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
+}
+
+
+func serviceDeploymentQueue() {
+ log.Debug("Checking for new deployments")
+
+ depID, manifestString := getQueuedDeployment()
+ if depID == "" {
+ return
+ }
+
+ var manifest bundleManifest
+ err := json.Unmarshal([]byte(manifestString), &manifest)
+ if err != nil {
+ log.Errorf("JSON decoding Manifest failed Err: %v", err)
+ return
+ }
+
+ err = prepareDeployment(depID, manifest)
+ if err != nil {
+ log.Errorf("Prepare deployment failed: %v", depID)
+ return
+ }
+
+ err = dequeueDeployment(depID)
+ if err != nil {
+ log.Warnf("Dequeue deployment failed: %v", depID)
+ }
+
+ log.Debugf("Signaling new deployment ready: %s", depID)
+ incoming <- depID
}
diff --git a/init.go b/init.go
index 36ffea1..11d79a7 100644
--- a/init.go
+++ b/init.go
@@ -32,15 +32,15 @@
config.SetDefault(configBundleDir, "/var/tmp")
var err error
- bundleDir := config.GetString(configBundleDir)
- if err := os.MkdirAll(bundleDir, 0700); err != nil {
+ dir := config.GetString(configBundleDir)
+ if err := os.MkdirAll(dir, 0700); err != nil {
log.Panicf("Failed bundle directory creation: %v", err)
}
- bundlePathAbs, err = filepath.Abs(bundleDir)
+ bundlePath, err = filepath.Abs(dir)
if err != nil {
log.Panicf("Cant find Abs Path : %v", err)
}
- log.Infof("Bundle directory path is %s", bundlePathAbs)
+ log.Infof("Bundle directory path is %s", bundlePath)
gitHubAccessToken = config.GetString(configGithubAccessToken)
@@ -55,23 +55,9 @@
initAPI(services)
initListener(services)
- orchestrateDeployment()
+ serviceDeploymentQueue()
log.Debug("end init")
return nil
}
-
-func initDB() {
- var count int
- row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='bundle_deployment';")
- if err := row.Scan(&count); err != nil {
- log.Panic("Unable to setup database", err)
- }
- if count == 0 {
- _, err := db.Exec("CREATE TABLE bundle_deployment (org varchar(255), id varchar(255), uri varchar(255), env varchar(255), etag varchar(255), manifest text, created_at integer, modified_at integer, deploy_status integer, error_code varchar(255), PRIMARY KEY (id)); CREATE TABLE bundle_info (type integer, env varchar(255), org varchar(255), id varchar(255), url varchar(255), file_url varchar(255), created_at integer, modified_at integer, deployment_id varchar(255), etag varchar(255), custom_tag varchar(255), deploy_status integer, error_code integer, error_reason text, PRIMARY KEY (id), FOREIGN KEY (deployment_id) references BUNDLE_DEPLOYMENT(id) ON DELETE CASCADE);")
- if err != nil {
- log.Panic("Unable to initialize DB", err)
- }
- }
-}
diff --git a/listener.go b/listener.go
index eb9ab16..c06709c 100644
--- a/listener.go
+++ b/listener.go
@@ -19,7 +19,7 @@
func (h *apigeeSyncHandler) Handle(e apid.Event) {
changeSet, ok := e.(*apidApigeeSync.ChangeSet)
if !ok {
- log.Errorf("Received non-ChangeSet event. This shouldn't happen!")
+ log.Errorf("Received non-ChangeSet event.")
return
}
@@ -33,26 +33,13 @@
switch payload.Data.Operation {
case "create":
- insertDeployment(payload.Data)
+ err := queueDeployment(payload.Data)
+ if err == nil {
+ serviceDeploymentQueue()
+ } else {
+ log.Errorf("unable to queue deployment")
+ }
}
}
}
-
-func insertDeployment(payload apidApigeeSync.DataPayload) {
-
- _, err := db.Exec("INSERT INTO BUNDLE_DEPLOYMENT (id, manifest, created_at, deploy_status) VALUES (?,?,?,?);",
- payload.EntityIdentifier,
- payload.PldCont.Manifest,
- payload.PldCont.CreatedAt,
- DEPLOYMENT_STATE_UNUSED,
- )
-
- if err != nil {
- log.Errorf("INSERT BUNDLE_DEPLOYMENT Failed: (%s, %v)", payload.EntityIdentifier, err)
- return
- }
-
- log.Infof("INSERT BUNDLE_DEPLOYMENT Success: (%s)", payload.EntityIdentifier)
- orchestrateDeploymentAndTrigger()
-}
diff --git a/listener_test.go b/listener_test.go
index 8858d4b..2e2ae47 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -14,7 +14,7 @@
It("should store data from ApigeeSync in the database", func(done Done) {
- entityID := "listener_test_1"
+ deploymentID := "listener_test_1"
uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
@@ -23,11 +23,11 @@
man := bundleManifest{
SysBun: systemBundle{
- Uri: bundleUri,
+ URI: bundleUri,
},
DepBun: []dependantBundle{
{
- Uri: bundleUri,
+ URI: bundleUri,
},
},
}
@@ -42,7 +42,7 @@
Data: DataPayload{
EntityType: "deployment",
Operation: "create",
- EntityIdentifier: entityID,
+ EntityIdentifier: deploymentID,
PldCont: Payload{
CreatedAt: now,
Manifest: manifest,
@@ -61,19 +61,30 @@
return
}
- // todo: should do a lot more checking here... maybe call another api instead?
- var selectedManifest string
- var createdAt int64
- err = db.QueryRow("SELECT manifest, created_at from bundle_deployment where id = ?", entityID).
- Scan(&selectedManifest, &createdAt)
+ depID, err := getCurrentDeploymentID()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(depID).Should(Equal(deploymentID))
+
+ dep, err := getDeployment(depID)
Expect(err).ShouldNot(HaveOccurred())
- Expect(manifest).Should(Equal(selectedManifest))
- Expect(createdAt).Should(Equal(now))
+ Expect(dep.System.URI).To(Equal(man.SysBun.URI))
+ Expect(len(dep.Bundles)).To(Equal(len(man.DepBun)))
+ Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri)))
+
+ // todo: should do a lot more checking here... maybe call another api instead?
+ //var selectedManifest string
+ //var createdAt int64
+ //err = db.QueryRow("SELECT manifest, created_at from bundle_deployment where id = ?", deploymentID).
+ // Scan(&selectedManifest, &createdAt)
+ //Expect(err).ShouldNot(HaveOccurred())
+ //
+ //Expect(manifest).Should(Equal(selectedManifest))
+ //Expect(createdAt).Should(Equal(now))
// clean up
- _, err := db.Exec("DELETE from bundle_deployment where id = ?", entityID)
- Expect(err).ShouldNot(HaveOccurred())
+ //_, err = db.Exec("DELETE from bundle_deployment where id = ?", deploymentID)
+ //Expect(err).ShouldNot(HaveOccurred())
close(done)
},