Use locks around table writes to avoid table lock errors, ensure scope is passed
diff --git a/api.go b/api.go index ef9b958..ff963d7 100644 --- a/api.go +++ b/api.go
@@ -200,79 +200,14 @@ return } - /* - * If the state of deployment was success, update state of bundles and - * its deployments as success as well - */ - txn, err := db.Begin() + err = updateDeploymentAndBundles(depID, rsp) if err != nil { - log.Errorf("Unable to begin transaction: %s", err) - writeDatabaseError(w) - return - } - - var updateErr error - if rsp.Status == RESPONSE_STATUS_SUCCESS { - updateErr = updateDeploymentSuccess(depID, txn) - } else { - updateErr = updateDeploymentFailure(depID, rsp.Error, txn) - } - - if updateErr != nil { - if updateErr == sql.ErrNoRows { + if err == sql.ErrNoRows { writeError(w, http.StatusNotFound, ERROR_CODE_TODO, "not found") } else { writeDatabaseError(w) } - err = txn.Rollback() - if err != nil { - log.Errorf("Unable to rollback transaction: %s", err) - } - return - } - - err = txn.Commit() - if err != nil { - log.Errorf("Unable to commit transaction: %s", err) - writeDatabaseError(w) } return } - -func updateDeploymentSuccess(depID string, txn *sql.Tx) error { - - log.Debugf("Marking deployment %s as succeeded", depID) - - err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0) - if err != nil { - return err - } - - err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS) - if err != nil { - return err - } - - return nil -} - -func updateDeploymentFailure(depID string, rsp deploymentErrorResponse, txn *sql.Tx) error { - - log.Infof("marking deployment %s as FAILED", depID) - - err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode) - if err != nil { - return err - } - - // Iterate over Bundles, and update the errors - for _, a := range rsp.ErrorDetails { - updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason) - if err != nil { - return err - } - } - - return err -}
diff --git a/api_test.go b/api_test.go index 25dccd4..7c0e810 100644 --- a/api_test.go +++ b/api_test.go
@@ -49,6 +49,11 @@ json.Unmarshal(body, &depRes) Expect(depRes.DeploymentID).Should(Equal(deploymentID)) + Expect(depRes.Bundles[0].Scope).Should(Equal("some-scope")) + // todo: scope, org, env, etc... need to be dealt with abstractly + //Expect(depRes.Bundles[0].Org).Should(Equal("org")) + //Expect(depRes.Bundles[0].Env).Should(Equal("env")) + Expect(res.Header.Get("etag")).Should(Equal(deploymentID)) }) @@ -267,7 +272,6 @@ bundleUri := uri.String() dep := deployment{ - DeploymentID: depID, System: bundle{ URI: bundleUri, },
diff --git a/data.go b/data.go index 389968f..dd18cdb 100644 --- a/data.go +++ b/data.go
@@ -3,6 +3,7 @@ import ( "database/sql" "time" + "sync" ) const ( @@ -56,6 +57,8 @@ return } + log.Debug("Creating database tables...") + tx, err := db.Begin() if err != nil { log.Panicf("Unable to start transaction: %v", err) @@ -89,17 +92,35 @@ err = tx.Commit() if err != nil { log.Panicf("Unable to commit transaction: %v", err) + } else { + log.Debug("Database tables created.") } } +var tableLocksLock sync.Mutex +var tableLocks map[string]*sync.Mutex = map[string]*sync.Mutex{} + +func getTableLocker(table string) sync.Locker { + tableLocksLock.Lock() + defer tableLocksLock.Unlock() + + lock := tableLocks[table] + if lock == nil { + lock = &sync.Mutex{} + tableLocks[table] = lock + } + + return lock +} + // currently only maintains 1 in the queue func queueDeployment(deploymentID, manifestString string) error { - queueMutex.Lock() - defer queueMutex.Unlock() - log.Debugf("queuing deployment %s: %s", deploymentID, manifestString) + getTableLocker("gateway_deploy_queue").Lock() + defer getTableLocker("gateway_deploy_queue").Unlock() + // validate manifest _, err := parseManifest(manifestString) if err != nil { @@ -135,6 +156,9 @@ log.Debug("getting queued deployment") + getTableLocker("gateway_deploy_queue").Lock() + defer getTableLocker("gateway_deploy_queue").Unlock() + err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;"). Scan(&depID, &manifestString) if err != nil { @@ -154,6 +178,9 @@ log.Debugf("deleting deployment %s from queue", depID) + getTableLocker("gateway_deploy_queue").Lock() + defer getTableLocker("gateway_deploy_queue").Unlock() + _, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID) if err != nil { log.Errorf("DELETE from gateway_deploy_queue failed: %s", err) @@ -169,6 +196,13 @@ func insertDeployment(depID string, dep deployment) error { + log.Debugf("insertDeployment: %s", depID) + + getTableLocker("gateway_deploy_deployment").Lock() + defer getTableLocker("gateway_deploy_deployment").Unlock() + getTableLocker("gateway_deploy_bundle").Lock() + defer getTableLocker("gateway_deploy_bundle").Unlock() + tx, err := db.Begin() if err != nil { log.Errorf("insertDeployment begin transaction failed: %v", depID, err) @@ -189,9 +223,9 @@ // system bundle // todo: extra data? _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + - "(id, deployment_id, type, uri, status, created_at) " + - "VALUES(?,?,?,?,?,?);", - dep.System.BundleID, depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) + "(id, deployment_id, scope, type, uri, status, created_at) " + + "VALUES(?,?,?,?,?,?,?);", + dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) if err != nil { log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err) return err @@ -219,7 +253,62 @@ return err } +func updateDeploymentAndBundles(depID string, rsp deploymentResponse) error { + + log.Debugf("updateDeploymentAndBundles: %s", depID) + + getTableLocker("gateway_deploy_deployment").Lock() + defer getTableLocker("gateway_deploy_deployment").Unlock() + getTableLocker("gateway_deploy_bundle").Lock() + defer getTableLocker("gateway_deploy_bundle").Unlock() + + /* + * If the state of deployment was success, update state of bundles and + * its deployments as success as well + */ + txn, err := db.Begin() + if err != nil { + log.Errorf("Unable to begin transaction: %s", err) + return err + } + defer txn.Rollback() + + if rsp.Status == RESPONSE_STATUS_SUCCESS { + err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0) + if err != nil { + return err + } + err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS) + if err != nil { + return err + } + } else { + err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.Error.ErrorCode) + if err != nil { + return err + } + + // Iterate over Bundles, and update the errors + for _, a := range rsp.Error.ErrorDetails { + updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason) + if err != nil { + return err + } + } + } + if err != nil { + return err + } + + err = txn.Commit() + if err != nil { + log.Errorf("Unable to commit updateDeploymentStatus transaction: %s", err) + } + return err +} + func updateDeploymentStatus(txn SQLExec, depID string, status int, errCode int) error { + var nRows int64 res, err := txn.Exec("UPDATE gateway_deploy_deployment " + "SET status=?, modified_at=?, error_code = ? WHERE id=?;", status, dbTimeNow(), errCode, depID) @@ -277,6 +366,7 @@ // getCurrentDeploymentID returns the ID of what should be the "current" deployment func getCurrentDeploymentID() (string, error) { + var depID string err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " + "WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID) @@ -287,7 +377,7 @@ // getDeployment returns a fully populated deploymentResponse func getDeployment(depID string) (*deployment, error) { - rows, err := db.Query("SELECT id, type, uri FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) + rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) if err != nil { log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err) return nil, err @@ -300,8 +390,8 @@ for rows.Next() { var bundleType int - var bundleID, uri string - err = rows.Scan(&bundleID, &bundleType, &uri) + var bundleID, uri, scope string + err = rows.Scan(&bundleID, &bundleType, &uri, &scope) if err != nil { log.Errorf("gateway_deploy_bundle fetch failed. Err: %s", err) return nil, err @@ -316,6 +406,7 @@ bd := bundle{ BundleID: bundleID, URI: fileUrl, + Scope: scope, } depRes.Bundles = append(depRes.Bundles, bd) }
diff --git a/deployments.go b/deployments.go index 235efc6..3a7a13a 100644 --- a/deployments.go +++ b/deployments.go
@@ -100,7 +100,7 @@ return github.GetUrlData(uri, gitHubAccessToken) } -// todo: retry on error? +// todo: retry on error // check if already exists and skip func prepareBundle(depID string, bun bundle) error { @@ -142,6 +142,11 @@ log.Debugf("preparing deployment: %s", depID) + getTableLocker("gateway_deploy_deployment").Lock() + defer getTableLocker("gateway_deploy_deployment").Unlock() + getTableLocker("gateway_deploy_bundle").Lock() + defer getTableLocker("gateway_deploy_bundle").Unlock() + err := insertDeployment(depID, dep) if err != nil { log.Errorf("insert deployment failed: %v", err) @@ -192,7 +197,6 @@ log.Debug("Checking for new deployments") - // todo: this does a get+delete - could lead to a missing deployment if there's a failure depID, manifestString := getQueuedDeployment() if depID == "" { return
diff --git a/listener_test.go b/listener_test.go index c45fd39..6872c55 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -10,7 +10,6 @@ ) // todo: test servicing the deployment queue -// todo: ensure "database table is locked" doesn't happen (check the test logs) var _ = Describe("listener", func() {