Use mutex to avoid conflicts, various transactional changes, other fixes
diff --git a/api.go b/api.go index 67a98ee..ef9b958 100644 --- a/api.go +++ b/api.go
@@ -195,7 +195,7 @@ return } - if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.GWbunRsp.ErrorCode == 0 || rsp.GWbunRsp.Reason == "") { + if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.Error.ErrorCode == 0 || rsp.Error.Reason == "") { writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "errorCode and reason are required") return } @@ -215,7 +215,7 @@ if rsp.Status == RESPONSE_STATUS_SUCCESS { updateErr = updateDeploymentSuccess(depID, txn) } else { - updateErr = updateDeploymentFailure(depID, rsp.GWbunRsp, txn) + updateErr = updateDeploymentFailure(depID, rsp.Error, txn) } if updateErr != nil { @@ -268,7 +268,7 @@ // Iterate over Bundles, and update the errors for _, a := range rsp.ErrorDetails { - updateBundleStatus(txn, depID, a.BundleId, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason) + updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason) if err != nil { return err }
diff --git a/api_test.go b/api_test.go index 9e31afa..25dccd4 100644 --- a/api_test.go +++ b/api_test.go
@@ -48,7 +48,7 @@ Expect(err).ShouldNot(HaveOccurred()) json.Unmarshal(body, &depRes) - Expect(depRes.DeploymentId).Should(Equal(deploymentID)) + Expect(depRes.DeploymentID).Should(Equal(deploymentID)) Expect(res.Header.Get("etag")).Should(Equal(deploymentID)) }) @@ -121,7 +121,7 @@ Expect(err).ShouldNot(HaveOccurred()) json.Unmarshal(body, &depRes) - Expect(depRes.DeploymentId).Should(Equal(deploymentID)) + Expect(depRes.DeploymentID).Should(Equal(deploymentID)) close(done) }() @@ -226,7 +226,7 @@ deploymentResult := deploymentResponse{ Status: RESPONSE_STATUS_FAIL, - GWbunRsp: deploymentErrorResponse{ + Error: deploymentErrorResponse{ ErrorCode: 100, Reason: "bad juju", //ErrorDetails: []deploymentErrorDetail{ // todo: add tests for bundle errors @@ -267,13 +267,13 @@ bundleUri := uri.String() dep := deployment{ - DeploymentId: depID, + DeploymentID: depID, System: bundle{ URI: bundleUri, }, Bundles: []bundle{ { - BundleId: "bun", + BundleID: "bun", URI: bundleUri, Scope: "some-scope", Org: "org",
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go index ee93279..4c80cf6 100644 --- a/cmd/apidGatewayDeploy/main.go +++ b/cmd/apidGatewayDeploy/main.go
@@ -8,6 +8,7 @@ "io/ioutil" "github.com/30x/transicator/common" "github.com/30x/apidGatewayDeploy" + "os" ) func main() { @@ -38,6 +39,7 @@ if err != nil { log.Panicf("ERROR: Unable to create temp dir", err) } + defer os.RemoveAll(tmpDir) config.Set("data_path", tmpDir) config.Set("gatewaydeploy_bundle_dir", tmpDir) }
diff --git a/data.go b/data.go index 69dda23..3b81a52 100644 --- a/data.go +++ b/data.go
@@ -33,7 +33,7 @@ Tables: gateway_deploy_queue - Deployment(s) received and not yet processed (potentially a Queue - one for now) + Deployment(s) received and not yet processed (one for now) gateway_deploy_deployment gateway_deploy_bundle */ @@ -61,6 +61,7 @@ log.Panicf("Unable to start transaction: %v", err) } defer tx.Rollback() + _, err = tx.Exec("CREATE TABLE gateway_deploy_queue (" + "id varchar(255), manifest text, created_at integer, " + "PRIMARY KEY (id));") @@ -94,6 +95,9 @@ // currently only maintains 1 in the queue func queueDeployment(deploymentID, manifestString string) error { + queueMutex.Lock() + defer queueMutex.Unlock() + log.Debugf("queuing deployment %s: %s", deploymentID, manifestString) // validate manifest @@ -103,20 +107,14 @@ } // maintains queue at 1 - tx, err := db.Begin() - if err != nil { - log.Debugf("INSERT gateway_deploy_queue failed: (%s)", deploymentID) - return err - } - defer tx.Rollback() - - _, err = tx.Exec("DELETE FROM gateway_deploy_queue"); + // todo: this should be transactional + _, err = db.Exec("DELETE FROM gateway_deploy_queue"); if err != nil { log.Errorf("DELETE FROM gateway_deploy_queue failed: %v", err) return err } - _, err = tx.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);", + _, err = db.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);", deploymentID, manifestString, dbTimeNow(), @@ -126,34 +124,43 @@ return err } - err = tx.Commit() - if err != nil { - log.Errorf("INSERT gateway_deploy_queue %s failed: %v", deploymentID, err) - return err - } - - log.Debugf("INSERT gateway_deploy_queue success: (%s)", deploymentID) + log.Debugf("deployment %s queued", deploymentID) return nil } +// committing passed transaction will delete deployment from queue +// Call using the following guard: func getQueuedDeployment() (depID, manifestString string) { + + log.Debug("getting queued deployment") + err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;"). Scan(&depID, &manifestString) if err != nil { if err == sql.ErrNoRows { log.Info("No Deployments available to be processed") } else { - // todo: panic? - log.Errorf("SELECT on BUNDLE_DEPLOYMENT failed with Err: %s", err) + log.Errorf("SELECT on gateway_deploy_queue failed: %s", err) } } + + log.Debugf("got queued deployment: %s", depID) + return } -func dequeueDeployment(depID string) error { +func deleteDeploymentFromQueue(depID string) { + + log.Debug("deleting deployment %s from queue", depID) + _, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID) - return err + if err != nil { + log.Errorf("DELETE from gateway_deploy_queue failed: %s", err) + return + } + + log.Debug("deleted deployment %s from queue", depID) } func dbTimeNow() int64 { @@ -164,7 +171,7 @@ tx, err := db.Begin() if err != nil { - log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err) + log.Errorf("insertDeployment begin transaction failed: %v", depID, err) return err } defer tx.Rollback() @@ -184,9 +191,9 @@ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + "(id, deployment_id, type, uri, status, created_at) " + "VALUES(?,?,?,?,?,?);", - "sys", depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) + dep.System.BundleID, depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, "sys", err) + log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err) return err } @@ -195,20 +202,20 @@ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + "(id, deployment_id, scope, type, uri, status, created_at) " + "VALUES(?,?,?,?,?,?,?);", - bun.BundleId, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow) + bun.BundleID, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow) if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleId, err) + log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleID, err) return err } } - log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID) - err = tx.Commit() if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s failed: %v", depID) + log.Errorf("commit insert to gateway_deploy_bundle %s failed: %v", depID, err) } + log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID) + return err } @@ -288,7 +295,7 @@ depRes := deployment{ Bundles: []bundle{}, - DeploymentId: depID, + DeploymentID: depID, } for rows.Next() { @@ -301,13 +308,13 @@ } if bundleType == BUNDLE_TYPE_SYS { depRes.System = bundle{ - BundleId: bundleID, + BundleID: bundleID, URI: uri, } } else { fileUrl := getBundleFilePath(depID, uri) bd := bundle{ - BundleId: bundleID, + BundleID: bundleID, URI: fileUrl, } depRes.Bundles = append(depRes.Bundles, bd)
diff --git a/deployments.go b/deployments.go index f4a9f60..235efc6 100644 --- a/deployments.go +++ b/deployments.go
@@ -11,6 +11,7 @@ "encoding/base64" "path" "errors" + "sync" ) var ( @@ -36,7 +37,7 @@ // event bundle type bundle struct { - BundleId string `json:"bundleId"` + BundleID string `json:"bundleId"` URI string `json:"uri"` Scope string `json:"scope"` Org string `json:"org"` @@ -45,7 +46,7 @@ // event deployment type deployment struct { - DeploymentId string `json:"deploymentId"` + DeploymentID string `json:"deploymentId"` System bundle `json:"system"` Bundles []bundle `json:"bundles"` } @@ -53,7 +54,7 @@ type deploymentErrorDetail struct { ErrorCode int `json:"errorCode"` Reason string `json:"reason"` - BundleId string `json:"bundleId"` + BundleID string `json:"bundleId"` } type deploymentErrorResponse struct { @@ -63,8 +64,8 @@ } type deploymentResponse struct { - Status string `json:"status"` - GWbunRsp deploymentErrorResponse `json:"error"` + Status string `json:"status"` + Error deploymentErrorResponse `json:"error"` } // retrieveBundle retrieves bundle data from a URI @@ -139,27 +140,27 @@ // all bundles will be attempted regardless of errors, in the future we could retry func prepareDeployment(depID string, dep deployment) error { + log.Debugf("preparing deployment: %s", depID) + + err := insertDeployment(depID, dep) + if err != nil { + log.Errorf("insert deployment failed: %v", err) + return err + } + deploymentPath := getDeploymentFilesPath(depID) - err := os.Mkdir(deploymentPath, 0700) + err = os.Mkdir(deploymentPath, 0700) if err != nil { log.Errorf("Deployment dir creation failed: %v", err) return err } - // todo: any reason to put all this in a single transaction? - - err = insertDeployment(depID, dep) - if err != nil { - log.Errorf("Prepare deployment failed: %v", err) - return err - } - // download bundles and store them locally - errors := make(chan error, len(dep.Bundles)) + errorsChan := make(chan error, len(dep.Bundles)) for i, bun := range dep.Bundles { go func() { err := prepareBundle(depID, bun) - errors <- err + errorsChan <- err if err != nil { id := string(i) err = updateBundleStatus(db, depID, id, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO, err.Error()) @@ -172,7 +173,7 @@ // fail fast on first error, otherwise wait for completion for range dep.Bundles { - err := <- errors + err := <-errorsChan if err != nil { updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO) return err @@ -182,10 +183,16 @@ return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0) } +var queueMutex sync.Mutex func serviceDeploymentQueue() { + + queueMutex.Lock() + defer queueMutex.Unlock() + log.Debug("Checking for new deployments") + // todo: this does a get+delete - could lead to a missing deployment if there's a failure depID, manifestString := getQueuedDeployment() if depID == "" { return @@ -198,14 +205,11 @@ err = prepareDeployment(depID, manifest) if err != nil { - log.Errorf("Prepare deployment failed: %v", depID) + log.Errorf("serviceDeploymentQueue prepare deployment failed: %v", depID) return } - err = dequeueDeployment(depID) - if err != nil { - log.Warnf("Dequeue deployment failed: %v", depID) - } + deleteDeploymentFromQueue(depID) log.Debugf("Signaling new deployment ready: %s", depID) incoming <- depID @@ -218,22 +222,22 @@ return } - // todo: validate manifest... + // validate manifest if dep.System.URI == "" { - err = errors.New("system bundle uri is required") + err = errors.New("system bundle 'uri' is required") return } for _, bun := range dep.Bundles { - if bun.BundleId == "" { - err = errors.New("bundle bundleID is required") + if bun.BundleID == "" { + err = errors.New("bundle 'bundleId' is required") return } if bun.URI == "" { - err = errors.New("bundle uri is required") + err = errors.New("bundle 'uri' is required") return } if bun.Scope == "" { - err = errors.New("bundle scope is required") + err = errors.New("bundle 'scope' is required") return } }
diff --git a/init.go b/init.go index 4b07bad..066590d 100644 --- a/init.go +++ b/init.go
@@ -1,11 +1,11 @@ package apiGatewayDeploy import ( - "database/sql" "github.com/30x/apid" "github.com/30x/apidGatewayDeploy/github" "os" "path/filepath" + "database/sql" ) const ( @@ -55,8 +55,7 @@ initAPI(services) initListener(services) - // todo: in goroutine? - serviceDeploymentQueue() + go serviceDeploymentQueue() log.Debug("end init")
diff --git a/listener.go b/listener.go index 7ae60af..358c320 100644 --- a/listener.go +++ b/listener.go
@@ -38,7 +38,7 @@ var err error switch table.Name { case MANIFEST_TABLE: - log.Debugf("Snapshot of %s with %d rows", table.Name, table.Rows) + log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows)) // todo: should be 0 or 1 per system!! row := table.Rows[len(table.Rows)-1] err = processNewManifest(row) @@ -52,10 +52,9 @@ } func processChangeList(changes *common.ChangeList) { - log.Debugf("Process %d changes", len(changes.Changes)) for _, change := range changes.Changes { - log.Debugf("payload table: %s operation: %s", change.Table, change.Operation) + log.Debugf("change table: %s operation: %s", change.Table, change.Operation) var err error switch change.Table { @@ -63,6 +62,8 @@ switch change.Operation { case common.Insert: err = processNewManifest(change.NewRow) + default: + log.Error("unexpected operation: %s", change.Operation) } } if err != nil {
diff --git a/listener_test.go b/listener_test.go index d89e43b..b4abd9f 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -9,6 +9,9 @@ "github.com/30x/transicator/common" ) +// todo: test servicing the deployment queue +// todo: ensure "database table is locked" doesn't happen (check the test logs) + var _ = Describe("listener", func() { It("should process ApigeeSync snapshot event", func(done Done) { @@ -21,13 +24,13 @@ bundleUri := uri.String() dep := deployment{ - DeploymentId: deploymentID, + DeploymentID: deploymentID, System: bundle{ URI: bundleUri, }, Bundles: []bundle{ { - BundleId: "bun", + BundleID: "bun", URI: bundleUri, Scope: "some-scope", }, @@ -50,29 +53,20 @@ } h := &test_handler{ - "checkDatabase", + deploymentID, func(e apid.Event) { defer GinkgoRecover() // ignore the first event, let standard listener process it - changeSet := e.(*common.Snapshot) - if len(changeSet.Tables) > 0 { + changeSet, ok := e.(*common.Snapshot) + if !ok || len(changeSet.Tables) > 0 { return } - // force queue to be emptied - serviceDeploymentQueue() - - depID, err := getCurrentDeploymentID() Expect(err).ShouldNot(HaveOccurred()) + depID, manString := getQueuedDeployment() Expect(depID).Should(Equal(deploymentID)) - - dep, err := getDeployment(depID) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(dep.System.URI).To(Equal(dep.System.URI)) - Expect(len(dep.Bundles)).To(Equal(len(dep.Bundles))) - Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri))) + Expect(manString).Should(Equal(string(depBytes))) close(done) }, @@ -92,24 +86,26 @@ uri.Path = "/bundle" bundleUri := uri.String() - man := bundleManifest{ - SysBun: systemBundle{ + dep := deployment{ + DeploymentID: deploymentID, + System: bundle{ URI: bundleUri, }, - DepBun: []dependantBundle{ + Bundles: []bundle{ { + BundleID: "bun", URI: bundleUri, Scope: "some-scope", }, }, } - manBytes, err := json.Marshal(man) + + depBytes, err := json.Marshal(dep) Expect(err).ShouldNot(HaveOccurred()) - manifest := string(manBytes) row := common.Row{} row["id"] = &common.ColumnVal{Value: deploymentID} - row["body"] = &common.ColumnVal{Value: manifest} + row["body"] = &common.ColumnVal{Value: string(depBytes)} var event = common.ChangeList{} event.Changes = []common.Change{ @@ -121,29 +117,19 @@ } h := &test_handler{ - "checkDatabase", + deploymentID, func(e apid.Event) { defer GinkgoRecover() // ignore the first event, let standard listener process it - changeSet := e.(*common.ChangeList) - if len(changeSet.Changes) > 0 { + changeSet, ok := e.(*common.ChangeList) + if !ok || len(changeSet.Changes) > 0 { return } - // force queue to be emptied - serviceDeploymentQueue() - - depID, err := getCurrentDeploymentID() - Expect(err).ShouldNot(HaveOccurred()) + depID, manString := getQueuedDeployment() Expect(depID).Should(Equal(deploymentID)) - - dep, err := getDeployment(depID) - Expect(err).ShouldNot(HaveOccurred()) - - Expect(dep.System.URI).To(Equal(man.SysBun.URI)) - Expect(len(dep.Bundles)).To(Equal(len(man.DepBun))) - Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri))) + Expect(manString).Should(Equal(string(depBytes))) close(done) },