Use mutex to avoid conflicts, various transactional changes, other fixes
diff --git a/api.go b/api.go
index 67a98ee..ef9b958 100644
--- a/api.go
+++ b/api.go
@@ -195,7 +195,7 @@
 		return
 	}
 
-	if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.GWbunRsp.ErrorCode == 0 || rsp.GWbunRsp.Reason == "") {
+	if rsp.Status == RESPONSE_STATUS_FAIL && (rsp.Error.ErrorCode == 0 || rsp.Error.Reason == "") {
 		writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "errorCode and reason are required")
 		return
 	}
@@ -215,7 +215,7 @@
 	if rsp.Status == RESPONSE_STATUS_SUCCESS {
 		updateErr = updateDeploymentSuccess(depID, txn)
 	} else {
-		updateErr = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
+		updateErr = updateDeploymentFailure(depID, rsp.Error, txn)
 	}
 
 	if updateErr != nil {
@@ -268,7 +268,7 @@
 
 	// Iterate over Bundles, and update the errors
 	for _, a := range rsp.ErrorDetails {
-		updateBundleStatus(txn, depID, a.BundleId, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
+		updateBundleStatus(txn, depID, a.BundleID, DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason)
 		if err != nil {
 			return err
 		}
diff --git a/api_test.go b/api_test.go
index 9e31afa..25dccd4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -48,7 +48,7 @@
 			Expect(err).ShouldNot(HaveOccurred())
 			json.Unmarshal(body, &depRes)
 
-			Expect(depRes.DeploymentId).Should(Equal(deploymentID))
+			Expect(depRes.DeploymentID).Should(Equal(deploymentID))
 			Expect(res.Header.Get("etag")).Should(Equal(deploymentID))
 		})
 
@@ -121,7 +121,7 @@
 				Expect(err).ShouldNot(HaveOccurred())
 				json.Unmarshal(body, &depRes)
 
-				Expect(depRes.DeploymentId).Should(Equal(deploymentID))
+				Expect(depRes.DeploymentID).Should(Equal(deploymentID))
 
 				close(done)
 			}()
@@ -226,7 +226,7 @@
 
 			deploymentResult := deploymentResponse{
 				Status: RESPONSE_STATUS_FAIL,
-				GWbunRsp: deploymentErrorResponse{
+				Error: deploymentErrorResponse{
 					ErrorCode: 100,
 					Reason: "bad juju",
 					//ErrorDetails: []deploymentErrorDetail{ // todo: add tests for bundle errors
@@ -267,13 +267,13 @@
 	bundleUri := uri.String()
 
 	dep := deployment{
-		DeploymentId: depID,
+		DeploymentID: depID,
 		System: bundle{
 			URI: bundleUri,
 		},
 		Bundles: []bundle{
 			{
-				BundleId: "bun",
+				BundleID: "bun",
 				URI: bundleUri,
 				Scope: "some-scope",
 				Org: "org",
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
index ee93279..4c80cf6 100644
--- a/cmd/apidGatewayDeploy/main.go
+++ b/cmd/apidGatewayDeploy/main.go
@@ -8,6 +8,7 @@
 	"io/ioutil"
 	"github.com/30x/transicator/common"
 	"github.com/30x/apidGatewayDeploy"
+	"os"
 )
 
 func main() {
@@ -38,6 +39,7 @@
 		if err != nil {
 			log.Panicf("ERROR: Unable to create temp dir", err)
 		}
+		defer os.RemoveAll(tmpDir)
 		config.Set("data_path", tmpDir)
 		config.Set("gatewaydeploy_bundle_dir", tmpDir)
 	}
diff --git a/data.go b/data.go
index 69dda23..3b81a52 100644
--- a/data.go
+++ b/data.go
@@ -33,7 +33,7 @@
 
 Tables:
 	gateway_deploy_queue
-		Deployment(s) received and not yet processed (potentially a Queue - one for now)
+		Deployment(s) received and not yet processed (one for now)
 	gateway_deploy_deployment
 	gateway_deploy_bundle
  */
@@ -61,6 +61,7 @@
 		log.Panicf("Unable to start transaction: %v", err)
 	}
 	defer tx.Rollback()
+
 	_, err = tx.Exec("CREATE TABLE gateway_deploy_queue (" +
 		"id varchar(255), manifest text, created_at integer, " +
 		"PRIMARY KEY (id));")
@@ -94,6 +95,9 @@
 // currently only maintains 1 in the queue
 func queueDeployment(deploymentID, manifestString string) error {
 
+	queueMutex.Lock()
+	defer queueMutex.Unlock()
+
 	log.Debugf("queuing deployment %s: %s", deploymentID, manifestString)
 
 	// validate manifest
@@ -103,20 +107,14 @@
 	}
 
 	// maintains queue at 1
-	tx, err := db.Begin()
-	if err != nil {
-		log.Debugf("INSERT gateway_deploy_queue failed: (%s)", deploymentID)
-		return err
-	}
-	defer tx.Rollback()
-
-	_, err = tx.Exec("DELETE FROM gateway_deploy_queue");
+	// todo: this should be transactional
+	_, err = db.Exec("DELETE FROM gateway_deploy_queue");
 	if err != nil {
 		log.Errorf("DELETE FROM gateway_deploy_queue failed: %v", err)
 		return err
 	}
 
-	_, err = tx.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);",
+	_, err = db.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);",
 		deploymentID,
 		manifestString,
 		dbTimeNow(),
@@ -126,34 +124,43 @@
 		return err
 	}
 
-	err = tx.Commit()
-	if err != nil {
-		log.Errorf("INSERT gateway_deploy_queue %s failed: %v", deploymentID, err)
-		return err
-	}
-
-	log.Debugf("INSERT gateway_deploy_queue success: (%s)", deploymentID)
+	log.Debugf("deployment %s queued", deploymentID)
 
 	return nil
 }
 
+// committing passed transaction will delete deployment from queue
+// Call using the following guard:
 func getQueuedDeployment() (depID, manifestString string) {
+
+	log.Debug("getting queued deployment")
+
 	err := db.QueryRow("SELECT id, manifest FROM gateway_deploy_queue ORDER BY created_at ASC LIMIT 1;").
 		Scan(&depID, &manifestString)
 	if err != nil {
 		if err == sql.ErrNoRows {
 			log.Info("No Deployments available to be processed")
 		} else {
-			// todo: panic?
-			log.Errorf("SELECT on BUNDLE_DEPLOYMENT failed with Err: %s", err)
+			log.Errorf("SELECT on gateway_deploy_queue failed: %s", err)
 		}
 	}
+
+	log.Debugf("got queued deployment: %s", depID)
+
 	return
 }
 
-func dequeueDeployment(depID string) error {
+func deleteDeploymentFromQueue(depID string) {
+
+	log.Debug("deleting deployment %s from queue", depID)
+
 	_, err := db.Exec("DELETE FROM gateway_deploy_queue WHERE id=?;", depID)
-	return err
+	if err != nil {
+		log.Errorf("DELETE from gateway_deploy_queue failed: %s", err)
+		return
+	}
+
+	log.Debug("deleted deployment %s from queue", depID)
 }
 
 func dbTimeNow() int64 {
@@ -164,7 +171,7 @@
 
 	tx, err := db.Begin()
 	if err != nil {
-		log.Errorf("INSERT gateway_deploy_deployment %s failed: %v", depID, err)
+		log.Errorf("insertDeployment begin transaction failed: %v", depID, err)
 		return err
 	}
 	defer tx.Rollback()
@@ -184,9 +191,9 @@
 	_, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
 		"(id, deployment_id, type, uri, status, created_at) " +
 		"VALUES(?,?,?,?,?,?);",
-		"sys", depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+		dep.System.BundleID, depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow)
 	if err != nil {
-		log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, "sys", err)
+		log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, dep.System.BundleID, err)
 		return err
 	}
 
@@ -195,20 +202,20 @@
 		_, err = tx.Exec("INSERT INTO gateway_deploy_bundle " +
 			"(id, deployment_id, scope, type, uri, status, created_at) " +
 			"VALUES(?,?,?,?,?,?,?);",
-			bun.BundleId, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow)
+			bun.BundleID, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow)
 		if err != nil {
-			log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleId, err)
+			log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleID, err)
 			return err
 		}
 	}
 
-	log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID)
-
 	err = tx.Commit()
 	if err != nil {
-		log.Errorf("INSERT gateway_deploy_bundle %s failed: %v", depID)
+		log.Errorf("commit insert to gateway_deploy_bundle %s failed: %v", depID, err)
 	}
 
+	log.Debugf("INSERT gateway_deploy_deployment %s succeeded", depID)
+
 	return err
 }
 
@@ -288,7 +295,7 @@
 
 	depRes := deployment{
 		Bundles:      []bundle{},
-		DeploymentId: depID,
+		DeploymentID: depID,
 	}
 
 	for rows.Next() {
@@ -301,13 +308,13 @@
 		}
 		if bundleType == BUNDLE_TYPE_SYS {
 			depRes.System = bundle{
-				BundleId: bundleID,
+				BundleID: bundleID,
 				URI:      uri,
 			}
 		} else {
 			fileUrl := getBundleFilePath(depID, uri)
 			bd := bundle{
-				BundleId: bundleID,
+				BundleID: bundleID,
 				URI:      fileUrl,
 			}
 			depRes.Bundles = append(depRes.Bundles, bd)
diff --git a/deployments.go b/deployments.go
index f4a9f60..235efc6 100644
--- a/deployments.go
+++ b/deployments.go
@@ -11,6 +11,7 @@
 	"encoding/base64"
 	"path"
 	"errors"
+	"sync"
 )
 
 var (
@@ -36,7 +37,7 @@
 
 // event bundle
 type bundle struct {
-	BundleId string  `json:"bundleId"`
+	BundleID string  `json:"bundleId"`
 	URI      string  `json:"uri"`
 	Scope    string  `json:"scope"`
 	Org      string  `json:"org"`
@@ -45,7 +46,7 @@
 
 // event deployment
 type deployment struct {
-	DeploymentId string   `json:"deploymentId"`
+	DeploymentID string   `json:"deploymentId"`
 	System       bundle   `json:"system"`
 	Bundles      []bundle `json:"bundles"`
 }
@@ -53,7 +54,7 @@
 type deploymentErrorDetail struct {
 	ErrorCode int    `json:"errorCode"`
 	Reason    string `json:"reason"`
-	BundleId  string `json:"bundleId"`
+	BundleID  string `json:"bundleId"`
 }
 
 type deploymentErrorResponse struct {
@@ -63,8 +64,8 @@
 }
 
 type deploymentResponse struct {
-	Status   string                  `json:"status"`
-	GWbunRsp deploymentErrorResponse `json:"error"`
+	Status string                  `json:"status"`
+	Error  deploymentErrorResponse `json:"error"`
 }
 
 // retrieveBundle retrieves bundle data from a URI
@@ -139,27 +140,27 @@
 // all bundles will be attempted regardless of errors, in the future we could retry
 func prepareDeployment(depID string, dep deployment) error {
 
+	log.Debugf("preparing deployment: %s", depID)
+
+	err := insertDeployment(depID, dep)
+	if err != nil {
+		log.Errorf("insert deployment failed: %v", err)
+		return err
+	}
+
 	deploymentPath := getDeploymentFilesPath(depID)
-	err := os.Mkdir(deploymentPath, 0700)
+	err = os.Mkdir(deploymentPath, 0700)
 	if err != nil {
 		log.Errorf("Deployment dir creation failed: %v", err)
 		return err
 	}
 
-	// todo: any reason to put all this in a single transaction?
-
-	err = insertDeployment(depID, dep)
-	if err != nil {
-		log.Errorf("Prepare deployment failed: %v", err)
-		return err
-	}
-
 	// download bundles and store them locally
-	errors := make(chan error, len(dep.Bundles))
+	errorsChan := make(chan error, len(dep.Bundles))
 	for i, bun := range dep.Bundles {
 		go func() {
 			err := prepareBundle(depID, bun)
-			errors <- err
+			errorsChan <- err
 			if err != nil {
 				id := string(i)
 				err = updateBundleStatus(db, depID, id, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO, err.Error())
@@ -172,7 +173,7 @@
 
 	// fail fast on first error, otherwise wait for completion
 	for range dep.Bundles {
-		err := <- errors
+		err := <-errorsChan
 		if err != nil {
 			updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO)
 			return err
@@ -182,10 +183,16 @@
 	return updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
 }
 
+var queueMutex sync.Mutex
 
 func serviceDeploymentQueue() {
+
+	queueMutex.Lock()
+	defer queueMutex.Unlock()
+
 	log.Debug("Checking for new deployments")
 
+	// todo: this does a get+delete - could lead to a missing deployment if there's a failure
 	depID, manifestString := getQueuedDeployment()
 	if depID == "" {
 		return
@@ -198,14 +205,11 @@
 
 	err = prepareDeployment(depID, manifest)
 	if err != nil {
-		log.Errorf("Prepare deployment failed: %v", depID)
+		log.Errorf("serviceDeploymentQueue prepare deployment failed: %v", depID)
 		return
 	}
 
-	err = dequeueDeployment(depID)
-	if err != nil {
-		log.Warnf("Dequeue deployment failed: %v", depID)
-	}
+	deleteDeploymentFromQueue(depID)
 
 	log.Debugf("Signaling new deployment ready: %s", depID)
 	incoming <- depID
@@ -218,22 +222,22 @@
 		return
 	}
 
-	// todo: validate manifest...
+	// validate manifest
 	if dep.System.URI == "" {
-		err = errors.New("system bundle uri is required")
+		err = errors.New("system bundle 'uri' is required")
 		return
 	}
 	for _, bun := range dep.Bundles {
-		if bun.BundleId == "" {
-			err = errors.New("bundle bundleID is required")
+		if bun.BundleID == "" {
+			err = errors.New("bundle 'bundleId' is required")
 			return
 		}
 		if bun.URI == "" {
-			err = errors.New("bundle uri is required")
+			err = errors.New("bundle 'uri' is required")
 			return
 		}
 		if bun.Scope == "" {
-			err = errors.New("bundle scope is required")
+			err = errors.New("bundle 'scope' is required")
 			return
 		}
 	}
diff --git a/init.go b/init.go
index 4b07bad..066590d 100644
--- a/init.go
+++ b/init.go
@@ -1,11 +1,11 @@
 package apiGatewayDeploy
 
 import (
-	"database/sql"
 	"github.com/30x/apid"
 	"github.com/30x/apidGatewayDeploy/github"
 	"os"
 	"path/filepath"
+	"database/sql"
 )
 
 const (
@@ -55,8 +55,7 @@
 	initAPI(services)
 	initListener(services)
 
-	// todo: in goroutine?
-	serviceDeploymentQueue()
+	go serviceDeploymentQueue()
 
 	log.Debug("end init")
 
diff --git a/listener.go b/listener.go
index 7ae60af..358c320 100644
--- a/listener.go
+++ b/listener.go
@@ -38,7 +38,7 @@
 		var err error
 		switch table.Name {
 		case MANIFEST_TABLE:
-			log.Debugf("Snapshot of %s with %d rows", table.Name, table.Rows)
+			log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
 			// todo: should be 0 or 1 per system!!
 			row := table.Rows[len(table.Rows)-1]
 			err = processNewManifest(row)
@@ -52,10 +52,9 @@
 }
 
 func processChangeList(changes *common.ChangeList) {
-	log.Debugf("Process %d changes", len(changes.Changes))
 
 	for _, change := range changes.Changes {
-		log.Debugf("payload table: %s operation: %s", change.Table, change.Operation)
+		log.Debugf("change table: %s operation: %s", change.Table, change.Operation)
 
 		var err error
 		switch change.Table {
@@ -63,6 +62,8 @@
 			switch change.Operation {
 			case common.Insert:
 				err = processNewManifest(change.NewRow)
+			default:
+				log.Error("unexpected operation: %s", change.Operation)
 			}
 		}
 		if err != nil {
diff --git a/listener_test.go b/listener_test.go
index d89e43b..b4abd9f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -9,6 +9,9 @@
 	"github.com/30x/transicator/common"
 )
 
+// todo: test servicing the deployment queue
+// todo: ensure "database table is locked" doesn't happen (check the test logs)
+
 var _ = Describe("listener", func() {
 
 	It("should process ApigeeSync snapshot event", func(done Done) {
@@ -21,13 +24,13 @@
 		bundleUri := uri.String()
 
 		dep := deployment{
-			DeploymentId: deploymentID,
+			DeploymentID: deploymentID,
 			System: bundle{
 				URI: bundleUri,
 			},
 			Bundles: []bundle{
 				{
-					BundleId: "bun",
+					BundleID: "bun",
 					URI: bundleUri,
 					Scope: "some-scope",
 				},
@@ -50,29 +53,20 @@
 		}
 
 		h := &test_handler{
-			"checkDatabase",
+			deploymentID,
 			func(e apid.Event) {
 				defer GinkgoRecover()
 
 				// ignore the first event, let standard listener process it
-				changeSet := e.(*common.Snapshot)
-				if len(changeSet.Tables) > 0 {
+				changeSet, ok := e.(*common.Snapshot)
+				if !ok || len(changeSet.Tables) > 0 {
 					return
 				}
 
-				// force queue to be emptied
-				serviceDeploymentQueue()
-
-				depID, err := getCurrentDeploymentID()
 				Expect(err).ShouldNot(HaveOccurred())
+				depID, manString := getQueuedDeployment()
 				Expect(depID).Should(Equal(deploymentID))
-
-				dep, err := getDeployment(depID)
-				Expect(err).ShouldNot(HaveOccurred())
-
-				Expect(dep.System.URI).To(Equal(dep.System.URI))
-				Expect(len(dep.Bundles)).To(Equal(len(dep.Bundles)))
-				Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri)))
+				Expect(manString).Should(Equal(string(depBytes)))
 
 				close(done)
 			},
@@ -92,24 +86,26 @@
 		uri.Path = "/bundle"
 		bundleUri := uri.String()
 
-		man := bundleManifest{
-			SysBun: systemBundle{
+		dep := deployment{
+			DeploymentID: deploymentID,
+			System: bundle{
 				URI: bundleUri,
 			},
-			DepBun: []dependantBundle{
+			Bundles: []bundle{
 				{
+					BundleID: "bun",
 					URI: bundleUri,
 					Scope: "some-scope",
 				},
 			},
 		}
-		manBytes, err := json.Marshal(man)
+
+		depBytes, err := json.Marshal(dep)
 		Expect(err).ShouldNot(HaveOccurred())
-		manifest := string(manBytes)
 
 		row := common.Row{}
 		row["id"] = &common.ColumnVal{Value: deploymentID}
-		row["body"] = &common.ColumnVal{Value: manifest}
+		row["body"] = &common.ColumnVal{Value: string(depBytes)}
 
 		var event = common.ChangeList{}
 		event.Changes = []common.Change{
@@ -121,29 +117,19 @@
 		}
 
 		h := &test_handler{
-			"checkDatabase",
+			deploymentID,
 			func(e apid.Event) {
 				defer GinkgoRecover()
 
 				// ignore the first event, let standard listener process it
-				changeSet := e.(*common.ChangeList)
-				if len(changeSet.Changes) > 0 {
+				changeSet, ok := e.(*common.ChangeList)
+				if !ok || len(changeSet.Changes) > 0 {
 					return
 				}
 
-				// force queue to be emptied
-				serviceDeploymentQueue()
-
-				depID, err := getCurrentDeploymentID()
-				Expect(err).ShouldNot(HaveOccurred())
+				depID, manString := getQueuedDeployment()
 				Expect(depID).Should(Equal(deploymentID))
-
-				dep, err := getDeployment(depID)
-				Expect(err).ShouldNot(HaveOccurred())
-
-				Expect(dep.System.URI).To(Equal(man.SysBun.URI))
-				Expect(len(dep.Bundles)).To(Equal(len(man.DepBun)))
-				Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri)))
+				Expect(manString).Should(Equal(string(depBytes)))
 
 				close(done)
 			},