Merge pull request #12 from 30x/XAPID-707

Xapid 707
diff --git a/api.go b/api.go
index 2fc41f4..a0e34cd 100644
--- a/api.go
+++ b/api.go
@@ -224,7 +224,7 @@
 
 	b, err := json.Marshal(apiDeps)
 	if err != nil {
-		log.Errorf("unable to marshal deployments: %s", err)
+		log.Errorf("unable to marshal deployments: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 	}
@@ -328,7 +328,7 @@
 				log.Errorf("failed to communicate with tracking service: %v", err)
 			} else {
 				b, _ := ioutil.ReadAll(resp.Body)
-				log.Errorf("tracking service call failed to %s , code: %d, body: %s", apiPath, resp.StatusCode, string(b))
+				log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b))
 			}
 			backOffFunc()
 			continue
diff --git a/bundle.go b/bundle.go
index 95fb3f6..1163946 100644
--- a/bundle.go
+++ b/bundle.go
@@ -143,7 +143,7 @@
 
 	// the content of the URI is unfortunately not guaranteed not to change, so I can't just use dep.BundleURI
 	// unfortunately, this also means that a bundle cache isn't especially relevant
-	fileName := dep.DataScopeID + dep.ID + dep.ID
+	fileName := dep.DataScopeID + "_" + dep.ID
 
 	return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(fileName)))
 }
diff --git a/data.go b/data.go
index d8af041..d36e745 100644
--- a/data.go
+++ b/data.go
@@ -86,8 +86,12 @@
 }
 
 func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
+	return insertDeployments(tx, []DataDeployment{dep})
+}
 
-	log.Debugf("insertDeployment: %s", dep.ID)
+func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
+
+	log.Debugf("inserting %d deployments", len(deps))
 
 	stmt, err := tx.Prepare(`
 	INSERT INTO deployments
@@ -99,23 +103,27 @@
 		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
 	`)
 	if err != nil {
-		log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
+		log.Errorf("prepare insert into deployments failed: %v", err)
 		return err
 	}
 	defer stmt.Close()
 
-	_, err = stmt.Exec(
-		dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
-		dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
-		dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
-		dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
-		dep.DeployErrorCode, dep.DeployErrorMessage)
-	if err != nil {
-		log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
-		return err
+	for _, dep := range deps {
+		log.Debugf("insertDeployment: %s", dep.ID)
+
+		_, err = stmt.Exec(
+			dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+			dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+			dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+			dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+			dep.DeployErrorCode, dep.DeployErrorMessage)
+		if err != nil {
+			log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
+			return err
+		}
 	}
 
-	log.Debugf("insert into deployments %s succeeded", dep.ID)
+	log.Debug("inserting deployments succeeded")
 	return err
 }
 
@@ -136,8 +144,6 @@
 		return err
 	}
 
-	deploymentsChanged <- depID
-
 	log.Debugf("deleteDeployment %s succeeded", depID)
 	return err
 }
@@ -260,9 +266,9 @@
 	return nil
 }
 
-func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+func getLocalBundleURI(depID string) (localBundleUri string, err error) {
 
-	err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+	err = getDB().QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
 	if err == sql.ErrNoRows {
 		err = nil
 	}
diff --git a/listener.go b/listener.go
index c3b591f..0f5c493 100644
--- a/listener.go
+++ b/listener.go
@@ -1,11 +1,12 @@
 package apiGatewayDeploy
 
 import (
-	"database/sql"
 	"encoding/json"
 	"os"
 	"time"
 
+	"fmt"
+
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 )
@@ -58,28 +59,41 @@
 		log.Panicf("Unable to initialize database: %v", err)
 	}
 
-	tx, err := db.Begin()
-	if err != nil {
-		log.Panicf("Error starting transaction: %v", err)
+	var deploymentsToInsert []DataDeployment
+	var errResults apiDeploymentResults
+	for _, table := range snapshot.Tables {
+		switch table.Name {
+		case DEPLOYMENT_TABLE:
+			for _, row := range table.Rows {
+				dep, err := dataDeploymentFromRow(row)
+				if err == nil {
+					deploymentsToInsert = append(deploymentsToInsert, dep)
+				} else {
+					result := apiDeploymentResult{
+						ID:        dep.ID,
+						Status:    RESPONSE_STATUS_FAIL,
+						ErrorCode: ERROR_CODE_TODO,
+						Message:   fmt.Sprintf("unable to parse deployment: %v", err),
+					}
+					errResults = append(errResults, result)
+				}
+			}
+		}
 	}
 
 	// ensure that no new database updates are made on old database
 	dbMux.Lock()
 	defer dbMux.Unlock()
 
+	tx, err := db.Begin()
+	if err != nil {
+		log.Panicf("Error starting transaction: %v", err)
+	}
 	defer tx.Rollback()
-	for _, table := range snapshot.Tables {
-		var err error
-		switch table.Name {
-		case DEPLOYMENT_TABLE:
-			log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
-			for _, row := range table.Rows {
-				err = addDeployment(tx, row)
-			}
-		}
-		if err != nil {
-			log.Panicf("Error processing Snapshot: %v", err)
-		}
+
+	err = insertDeployments(tx, deploymentsToInsert)
+	if err != nil {
+		log.Panicf("Error processing Snapshot: %v", err)
 	}
 
 	err = tx.Commit()
@@ -89,6 +103,15 @@
 
 	SetDB(db)
 
+	for _, dep := range deploymentsToInsert {
+		queueDownloadRequest(dep)
+	}
+
+	// transmit parsing errors back immediately
+	if len(errResults) > 0 {
+		go transmitDeploymentResultsToServer(errResults)
+	}
+
 	// if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
 	if len(snapshot.Tables) == 0 {
 		go func() {
@@ -107,54 +130,89 @@
 
 func processChangeList(changes *common.ChangeList) {
 
+	// gather deleted bundle info
+	var deploymentsToInsert, deploymentsToDelete []DataDeployment
+	var errResults apiDeploymentResults
+	for _, change := range changes.Changes {
+		switch change.Table {
+		case DEPLOYMENT_TABLE:
+			switch change.Operation {
+			case common.Insert:
+				dep, err := dataDeploymentFromRow(change.NewRow)
+				if err == nil {
+					deploymentsToInsert = append(deploymentsToInsert, dep)
+				} else {
+					result := apiDeploymentResult{
+						ID:        dep.ID,
+						Status:    RESPONSE_STATUS_FAIL,
+						ErrorCode: ERROR_CODE_TODO,
+						Message:   fmt.Sprintf("unable to parse deployment: %v", err),
+					}
+					errResults = append(errResults, result)
+				}
+			case common.Delete:
+				var id, dataScopeID string
+				change.OldRow.Get("id", &id)
+				change.OldRow.Get("data_scope_id", &dataScopeID)
+				// only need these two fields to delete and determine bundle file
+				dep := DataDeployment{
+					ID:          id,
+					DataScopeID: dataScopeID,
+				}
+				deploymentsToDelete = append(deploymentsToDelete, dep)
+			default:
+				log.Errorf("unexpected operation: %s", change.Operation)
+			}
+		}
+	}
+
+	// transmit parsing errors back immediately
+	if len(errResults) > 0 {
+		go transmitDeploymentResultsToServer(errResults)
+	}
+
 	tx, err := getDB().Begin()
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
 	defer tx.Rollback()
 
-	// ensure bundle download and delete updates aren't attempted while in process
-	dbMux.Lock()
-	defer dbMux.Unlock()
-
-	var bundlesToDelete []string
-	for _, change := range changes.Changes {
-		var err error
-		switch change.Table {
-		case DEPLOYMENT_TABLE:
-			switch change.Operation {
-			case common.Insert:
-				err = addDeployment(tx, change.NewRow)
-			case common.Delete:
-				var id string
-				change.OldRow.Get("id", &id)
-				localBundleUri, err := getLocalBundleURI(tx, id)
-				if err == nil {
-					bundlesToDelete = append(bundlesToDelete, localBundleUri)
-					err = deleteDeployment(tx, id)
-				}
-			default:
-				log.Errorf("unexpected operation: %s", change.Operation)
-			}
-		}
+	for _, dep := range deploymentsToDelete {
+		err = deleteDeployment(tx, dep.ID)
 		if err != nil {
 			log.Panicf("Error processing ChangeList: %v", err)
 		}
 	}
+	err = insertDeployments(tx, deploymentsToInsert)
+	if err != nil {
+		log.Panicf("Error processing ChangeList: %v", err)
+	}
+
 	err = tx.Commit()
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
 
+	if len(deploymentsToDelete) > 0 {
+		deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter
+	}
+
+	log.Debug("ChangeList processed")
+
+	for _, dep := range deploymentsToInsert {
+		queueDownloadRequest(dep)
+	}
+
 	// clean up old bundles
-	if len(bundlesToDelete) > 0 {
-		log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+	if len(deploymentsToDelete) > 0 {
+		log.Debugf("will delete %d old bundles", len(deploymentsToDelete))
 		go func() {
 			// give clients a minute to avoid conflicts
 			time.Sleep(bundleCleanupDelay)
-			for _, b := range bundlesToDelete {
-				log.Debugf("removing old bundle: %v", b)
-				safeDelete(b)
+			for _, dep := range deploymentsToDelete {
+				bundleFile := getBundleFile(dep)
+				log.Debugf("removing old bundle: %v", bundleFile)
+				safeDelete(bundleFile)
 			}
 		}()
 	}
@@ -188,23 +246,6 @@
 	return
 }
 
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
-
-	var d DataDeployment
-	d, err = dataDeploymentFromRow(row)
-	if err != nil {
-		return
-	}
-
-	err = InsertDeployment(tx, d)
-	if err != nil {
-		return
-	}
-
-	queueDownloadRequest(d)
-	return
-}
-
 func safeDelete(file string) {
 	if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
 		log.Warnf("unable to delete file %s: %v", file, e)
diff --git a/listener_test.go b/listener_test.go
index aeddbf9..dbee310 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -2,11 +2,12 @@
 
 import (
 	"encoding/json"
+	"net/url"
+
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
-	"net/url"
 )
 
 var _ = Describe("listener", func() {