Use locks around table writes to avoid table lock errors, ensure scope is passed
diff --git a/api.go b/api.go
index ef9b958..ff963d7 100644
--- a/api.go
+++ b/api.go
@@ -200,79 +200,14 @@
 		return
 	}
 
-	/*
-	 * If the state of deployment was success, update state of bundles and
-	 * its deployments as success as well
-	 */
-	txn, err := db.Begin()
+	err = updateDeploymentAndBundles(depID, rsp)
 	if err != nil {
-		log.Errorf("Unable to begin transaction: %s", err)
-		writeDatabaseError(w)
-		return
-	}
-
-	var updateErr error
-	if rsp.Status == RESPONSE_STATUS_SUCCESS {
-		updateErr = updateDeploymentSuccess(depID, txn)
-	} else {
-		updateErr = updateDeploymentFailure(depID, rsp.Error, txn)
-	}
-
-	if updateErr != nil {
-		if updateErr == sql.ErrNoRows {
+		if err == sql.ErrNoRows {
 			writeError(w, http.StatusNotFound, ERROR_CODE_TODO, "not found")
 		} else {
 			writeDatabaseError(w)
 		}
-		err = txn.Rollback()
-		if err != nil {
-			log.Errorf("Unable to rollback transaction: %s", err)
-		}
-		return
-	}
-
-	err = txn.Commit()
-	if err != nil {
-		log.Errorf("Unable to commit transaction: %s", err)
-		writeDatabaseError(w)
 	}
 
 	return
 }
-
-func updateDeploymentSuccess(depID string, txn *sql.Tx) error {
-
-	log.Debugf("Marking deployment %s as succeeded", depID)
-
-	err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0)
-	if err != nil {
-		return err
-	}
-
-	err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS)
-	if err != nil {
-		return err
-	}
-
-	return nil
-}
-
-func updateDeploymentFailure(depID string, rsp deploymentErrorResponse, txn *sql.Tx) error {
-
-	log.Infof("marking deployment %s as FAILED", depID)
-
-	err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode)
-	if err != nil {
-		return err
-	}
-
-	// Iterate over Bundles, and update the errors
-	for _, a := range rsp.ErrorDetails {
-		updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
-		if err != nil {
-			return err
-		}
-	}
-
-	return err
-}
diff --git a/api_test.go b/api_test.go
index 25dccd4..7c0e810 100644
--- a/api_test.go
+++ b/api_test.go
@@ -49,6 +49,11 @@
 			json.Unmarshal(body, &depRes)
 
 			Expect(depRes.DeploymentID).Should(Equal(deploymentID))
+			Expect(depRes.Bundles[0].Scope).Should(Equal("some-scope"))
+			// todo: scope, org, env, etc... need to be dealt with abstractly
+			//Expect(depRes.Bundles[0].Org).Should(Equal("org"))
+			//Expect(depRes.Bundles[0].Env).Should(Equal("env"))
+
 			Expect(res.Header.Get("etag")).Should(Equal(deploymentID))
 		})
 
@@ -267,7 +272,6 @@
 	bundleUri := uri.String()
 
 	dep := deployment{
-		DeploymentID: depID,
 		System: bundle{
 			URI: bundleUri,
 		},
diff --git a/data.go b/data.go
index 389968f..dd18cdb 100644
--- a/data.go
+++ b/data.go
@@ -3,6 +3,7 @@
 import (
 	"database/sql"
 	"time"
+	"sync"
 )
 
 const (
@@ -56,6 +57,8 @@
 		return
 	}
 
+	log.Debug("Creating database tables...")
+
 	tx, err := db.Begin()
 	if err != nil {
 		log.Panicf("Unable to start transaction: %v", err)
@@ -89,17 +92,35 @@
 	err = tx.Commit()
 	if err != nil {
 		log.Panicf("Unable to commit transaction: %v", err)
+	} else {
+		log.Debug("Database tables created.")
 	}
 }
 
+var tableLocksLock sync.Mutex
+var tableLocks map[string]*sync.Mutex = map[string]*sync.Mutex{}
+
+func getTableLocker(table string) sync.Locker {
+	tableLocksLock.Lock()
+	defer tableLocksLock.Unlock()
+
+	lock := tableLocks[table]
+	if lock == nil {
+		lock = &sync.Mutex{}
+		tableLocks[table] = lock
+	}
+
+	return lock
+}
+
 // currently only maintains 1 in the queue
 func queueDeployment(deploymentID, manifestString string) error {
 
-	queueMutex.Lock()
-	defer queueMutex.Unlock()
-
 	log.Debugf("queuing deployment %s: %s", deploymentID, manifestString)
 
+	getTableLocker("gateway_deploy_queue").Lock()
+	defer getTableLocker("gateway_deploy_queue").Unlock()
+
 	// validate manifest
 	_, err := parseManifest(manifestString)
 	if err != nil {
@@ -135,6 +156,9 @@
 
 	log.Debug("getting queued deployment")
 
+	getTableLocker("gateway_deploy_queue").Lock()
+	defer getTableLocker("gateway_deploy_queue").Unlock()
+
 	err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;").
 		Scan(&depID, &manifestString)
 	if err != nil {
@@ -154,6 +178,9 @@
 
 	log.Debugf("deleting deployment %s from queue", depID)
 
+	getTableLocker("gateway_deploy_queue").Lock()
+	defer getTableLocker("gateway_deploy_queue").Unlock()
+
 	_, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID)
 	if err != nil {
 		log.Errorf("DELETE from gateway_deploy_queue failed: %s", err)
@@ -169,6 +196,13 @@
 
 func insertDeployment(depID string, dep deployment) error {
 
+	log.Debugf("insertDeployment: %s", depID)
+
+	getTableLocker("gateway_deploy_deployment").Lock()
+	defer getTableLocker("gateway_deploy_deployment").Unlock()
+	getTableLocker("gateway_deploy_bundle").Lock()
+	defer getTableLocker("gateway_deploy_bundle").Unlock()
+
 	tx, err := db.Begin()
 	if err != nil {
 		log.Errorf("insertDeployment begin transaction failed: %v", depID, err)
@@ -189,9 +223,9 @@
 	// system bundle
 	// todo: extra data?
 	_, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
-		"(id, deployment_id, type, uri, status, created_at) " +
-		"VALUES(?,?,?,?,?,?);",
-		dep.System.BundleID, depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+		"(id, deployment_id, scope, type, uri, status, created_at) " +
+		"VALUES(?,?,?,?,?,?,?);",
+		dep.System.BundleID, depID, dep.System.Scope, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow)
 	if err != nil {
 		log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err)
 		return err
@@ -219,7 +253,62 @@
 	return err
 }
 
+func updateDeploymentAndBundles(depID string, rsp deploymentResponse) error {
+
+	log.Debugf("updateDeploymentAndBundles: %s", depID)
+
+	getTableLocker("gateway_deploy_deployment").Lock()
+	defer getTableLocker("gateway_deploy_deployment").Unlock()
+	getTableLocker("gateway_deploy_bundle").Lock()
+	defer getTableLocker("gateway_deploy_bundle").Unlock()
+
+	/*
+	 * If the state of deployment was success, update state of bundles and
+	 * its deployments as success as well
+	 */
+	txn, err := db.Begin()
+	if err != nil {
+		log.Errorf("Unable to begin transaction: %s", err)
+		return err
+	}
+	defer txn.Rollback()
+
+	if rsp.Status == RESPONSE_STATUS_SUCCESS {
+		err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS, 0)
+		if err != nil {
+			return err
+		}
+		err = updateAllBundleStatus(txn, depID, DEPLOYMENT_STATE_SUCCESS)
+		if err != nil {
+			return err
+		}
+	} else {
+		err := updateDeploymentStatus(txn, depID, DEPLOYMENT_STATE_ERR_GWY, rsp.Error.ErrorCode)
+		if err != nil {
+			return err
+		}
+
+		// Iterate over Bundles, and update the errors
+		for _, a := range rsp.Error.ErrorDetails {
+			updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
+			if err != nil {
+				return err
+			}
+		}
+	}
+	if err != nil {
+		return err
+	}
+
+	err = txn.Commit()
+	if err != nil {
+		log.Errorf("Unable to commit updateDeploymentStatus transaction: %s", err)
+	}
+	return err
+}
+
 func updateDeploymentStatus(txn SQLExec, depID string, status int, errCode int) error {
+
 	var nRows int64
 	res, err := txn.Exec("UPDATE gateway_deploy_deployment " +
 		"SET status=?, modified_at=?, error_code = ? WHERE id=?;", status, dbTimeNow(), errCode, depID)
@@ -277,6 +366,7 @@
 
 // getCurrentDeploymentID returns the ID of what should be the "current" deployment
 func getCurrentDeploymentID() (string, error) {
+
 	var depID string
 	err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " +
 		"WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID)
@@ -287,7 +377,7 @@
 // getDeployment returns a fully populated deploymentResponse
 func getDeployment(depID string) (*deployment, error) {
 
-	rows, err := db.Query("SELECT id, type, uri FROM gateway_deploy_bundle WHERE deployment_id=?;", depID)
+	rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope FROM gateway_deploy_bundle WHERE deployment_id=?;", depID)
 	if err != nil {
 		log.Errorf("Unable to query gateway_deploy_bundle. Err: %s", err)
 		return nil, err
@@ -300,8 +390,8 @@
 
 	for rows.Next() {
 		var bundleType int
-		var bundleID, uri string
-		err = rows.Scan(&bundleID, &bundleType, &uri)
+		var bundleID, uri, scope string
+		err = rows.Scan(&bundleID, &bundleType, &uri, &scope)
 		if err != nil {
 			log.Errorf("gateway_deploy_bundle fetch failed. Err: %s", err)
 			return nil, err
@@ -316,6 +406,7 @@
 			bd := bundle{
 				BundleID: bundleID,
 				URI:      fileUrl,
+				Scope:    scope,
 			}
 			depRes.Bundles = append(depRes.Bundles, bd)
 		}
diff --git a/deployments.go b/deployments.go
index 235efc6..3a7a13a 100644
--- a/deployments.go
+++ b/deployments.go
@@ -100,7 +100,7 @@
 	return github.GetUrlData(uri, gitHubAccessToken)
 }
 
-// todo: retry on error?
+// todo: retry on error
 // check if already exists and skip
 func prepareBundle(depID string, bun bundle) error {
 
@@ -142,6 +142,11 @@
 
 	log.Debugf("preparing deployment: %s", depID)
 
+	getTableLocker("gateway_deploy_deployment").Lock()
+	defer getTableLocker("gateway_deploy_deployment").Unlock()
+	getTableLocker("gateway_deploy_bundle").Lock()
+	defer getTableLocker("gateway_deploy_bundle").Unlock()
+
 	err := insertDeployment(depID, dep)
 	if err != nil {
 		log.Errorf("insert deployment failed: %v", err)
@@ -192,7 +197,6 @@
 
 	log.Debug("Checking for new deployments")
 
-	// todo: this does a get+delete - could lead to a missing deployment if there's a failure
 	depID, manifestString := getQueuedDeployment()
 	if depID == "" {
 		return
diff --git a/listener_test.go b/listener_test.go
index c45fd39..6872c55 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -10,7 +10,6 @@
 )
 
 // todo: test servicing the deployment queue
-// todo: ensure "database table is locked" doesn't happen (check the test logs)
 
 var _ = Describe("listener", func() {