Merge pull request #23 from 30x/XAPID-941

Xapid 941
diff --git a/.gitignore b/.gitignore
index 9d01395..0df4220 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,5 @@
 cmd/apidGatewayDeploy/apidGatewayDeploy
 *.lock
 /apidGatewayDeploy.iml
+/data.go
+.idea
\ No newline at end of file
diff --git a/api.go b/api.go
index 972b8eb..8a628e7 100644
--- a/api.go
+++ b/api.go
@@ -32,8 +32,9 @@
 )
 
 const (
-	sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
-	iso8601       = "2006-01-02T15:04:05.999Z07:00"
+	sqlTimeFormat    = "2006-01-02 15:04:05.999 -0700 MST"
+	iso8601          = "2006-01-02T15:04:05.999Z07:00"
+	sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00"
 )
 
 type deploymentsResult struct {
@@ -392,7 +393,7 @@
 	if t == "" {
 		return ""
 	}
-	formats := []string{sqlTimeFormat, iso8601, time.RFC3339}
+	formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339}
 	for _, f := range formats {
 		timestamp, err := time.Parse(f, t)
 		if err == nil {
diff --git a/api_test.go b/api_test.go
index c6acd5e..a49b9a5 100644
--- a/api_test.go
+++ b/api_test.go
@@ -295,7 +295,7 @@
 			Expect(resp.StatusCode).Should(Equal(http.StatusOK))
 
 			var deployStatus string
-			err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID).
+			err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID).
 				Scan(&deployStatus)
 			Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS))
 		})
@@ -332,7 +332,7 @@
 			var deploy_error_code int
 			err = db.QueryRow(`
 			SELECT deploy_status, deploy_error_code, deploy_error_message
-			FROM deployments
+			FROM edgex_deployment
 			WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message)
 			Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL))
 			Expect(deploy_error_code).Should(Equal(100))
@@ -364,8 +364,8 @@
 		})
 
 		It("should get iso8601 time", func() {
-			testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462 -0700 MST", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
-			isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
+			testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"}
+			isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"}
 			for i, t := range testTimes {
 				log.Debug("insert deployment with timestamp: " + t)
 				deploymentID := "api_time_iso8601_" + strconv.Itoa(i)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 3b55d05..236fe51 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -42,9 +42,10 @@
 
 	apid.InitializePlugins("")
 
+	// init full DB
 	db, err := data.DB()
 	Expect(err).NotTo(HaveOccurred())
-	err = InitDB(db)
+	err = InitDBFullColumns(db)
 	Expect(err).NotTo(HaveOccurred())
 	SetDB(db)
 
@@ -114,8 +115,9 @@
 	apiServerBaseURI, err = url.Parse(testServer.URL)
 	Expect(err).NotTo(HaveOccurred())
 
-	_, err = getDB().Exec("DELETE FROM deployments")
+	_, err = getDB().Exec("DELETE FROM edgex_deployment")
 	Expect(err).ShouldNot(HaveOccurred())
+
 	_, err = getDB().Exec("UPDATE etag SET value=1")
 })
 
diff --git a/data.go b/data.go
index ad67728..c2b3ecf 100644
--- a/data.go
+++ b/data.go
@@ -5,7 +5,9 @@
 	"fmt"
 	"sync"
 
+	"encoding/json"
 	"github.com/30x/apid-core"
+	"strings"
 )
 
 var (
@@ -38,9 +40,9 @@
 	Exec(query string, args ...interface{}) (sql.Result, error)
 }
 
-func InitDB(db apid.DB) error {
+func InitDBFullColumns(db apid.DB) error {
 	_, err := db.Exec(`
-	CREATE TABLE IF NOT EXISTS deployments (
+	CREATE TABLE IF NOT EXISTS edgex_deployment (
 		id character varying(36) NOT NULL,
 		bundle_config_id varchar(36) NOT NULL,
 		apid_cluster_id varchar(36) NOT NULL,
@@ -51,7 +53,7 @@
 		created_by text,
 		updated timestamp without time zone,
 		updated_by text,
-		bundle_name text,
+		bundle_config_name text,
 		bundle_uri text,
 		local_bundle_uri text,
 		bundle_checksum text,
@@ -70,6 +72,56 @@
 	return nil
 }
 
+func InitDB(db apid.DB) error {
+	_, err := db.Exec(`
+	CREATE TABLE IF NOT EXISTS edgex_deployment (
+		id character varying(36) NOT NULL,
+		bundle_config_id varchar(36) NOT NULL,
+		apid_cluster_id varchar(36) NOT NULL,
+		data_scope_id varchar(36) NOT NULL,
+		bundle_config_json text NOT NULL,
+		config_json text NOT NULL,
+		created timestamp without time zone,
+		created_by text,
+		updated timestamp without time zone,
+		updated_by text,
+		bundle_config_name text,
+		PRIMARY KEY (id)
+	);
+	`)
+	if err != nil {
+		return err
+	}
+
+	log.Debug("Database tables created.")
+	return nil
+}
+
+func alterTable(db apid.DB) error {
+	queries := []string{
+		"ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;",
+		"ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;",
+		"ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;",
+		"ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;",
+		"ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;",
+		"ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;",
+		"ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;",
+	}
+
+	for _, query := range queries {
+		_, err := db.Exec(query)
+		if err != nil {
+			if strings.Contains(err.Error(), "duplicate column name")   {
+				log.Warnf("AlterTable warning: %s", err)
+			} else {
+				return err
+			}
+		}
+	}
+	log.Debug("Database table altered.")
+	return nil
+}
+
 func getDB() apid.DB {
 	dbMux.RLock()
 	db := unsafeDB
@@ -91,19 +143,19 @@
 
 func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
 
-	log.Debugf("inserting %d deployments", len(deps))
+	log.Debugf("inserting %d edgex_deployment", len(deps))
 
 	stmt, err := tx.Prepare(`
-	INSERT INTO deployments
+	INSERT INTO edgex_deployment
 		(id, bundle_config_id, apid_cluster_id, data_scope_id,
 		bundle_config_json, config_json, created, created_by,
-		updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+		updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri,
 		bundle_checksum, bundle_checksum_type, deploy_status,
 		deploy_error_code, deploy_error_message)
 		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
 	`)
 	if err != nil {
-		log.Errorf("prepare insert into deployments failed: %v", err)
+		log.Errorf("prepare insert into edgex_deployment failed: %v", err)
 		return err
 	}
 	defer stmt.Close()
@@ -118,29 +170,87 @@
 			dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
 			dep.DeployErrorCode, dep.DeployErrorMessage)
 		if err != nil {
-			log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
+			log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
 			return err
 		}
 	}
 
-	log.Debug("inserting deployments succeeded")
+	log.Debug("inserting edgex_deployment succeeded")
 	return err
 }
 
+func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error {
+
+	log.Debugf("updating %d edgex_deployment", len(deps))
+
+	stmt, err := tx.Prepare(`
+	UPDATE edgex_deployment SET
+		(bundle_uri, local_bundle_uri,
+		bundle_checksum, bundle_checksum_type, deploy_status,
+		deploy_error_code, deploy_error_message)
+		= ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8
+	`)
+	if err != nil {
+		log.Errorf("prepare update edgex_deployment failed: %v", err)
+		return err
+	}
+	defer stmt.Close()
+
+	for _, dep := range deps {
+		log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI)
+
+		_, err = stmt.Exec(
+			dep.BundleURI,
+			dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+			dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID)
+		if err != nil {
+			log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err)
+			return err
+		}
+	}
+
+	log.Debug("updateDeploymentsColumns of edgex_deployment succeeded")
+	return err
+}
+
+func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) {
+	deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL")
+	if err != nil {
+		log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err)
+		return
+	}
+	var bc bundleConfigJson
+	for i, _ := range deployments {
+		log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON)
+		json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc)
+		if err != nil {
+			log.Errorf("JSON decoding Manifest failed: %v", err)
+			return
+		}
+		deployments[i].BundleName = bc.Name
+		deployments[i].BundleURI = bc.URI
+		deployments[i].BundleChecksumType = bc.ChecksumType
+		deployments[i].BundleChecksum = bc.Checksum
+
+		log.Debugf("Unmarshal: %v", deployments[i].BundleURI)
+	}
+	return
+}
+
 func deleteDeployment(tx *sql.Tx, depID string) error {
 
 	log.Debugf("deleteDeployment: %s", depID)
 
-	stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
+	stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;")
 	if err != nil {
-		log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
+		log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err)
 		return err
 	}
 	defer stmt.Close()
 
 	_, err = stmt.Exec(depID)
 	if err != nil {
-		log.Errorf("delete from deployments %s failed: %v", depID, err)
+		log.Errorf("delete from edgex_deployment %s failed: %v", depID, err)
 		return err
 	}
 
@@ -166,10 +276,10 @@
 	stmt, err = db.Prepare(`
 	SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
 		bundle_config_json, config_json, created, created_by,
-		updated, updated_by, bundle_name, bundle_uri,
+		updated, updated_by, bundle_config_name, bundle_uri,
 		local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
 		deploy_error_code, deploy_error_message
-	FROM deployments
+	FROM edgex_deployment
 	` + where)
 	if err != nil {
 		return
@@ -181,7 +291,7 @@
 		if err == sql.ErrNoRows {
 			return
 		}
-		log.Errorf("Error querying deployments: %v", err)
+		log.Errorf("Error querying edgex_deployment: %v", err)
 		return
 	}
 	defer rows.Close()
@@ -220,7 +330,7 @@
 	defer tx.Rollback()
 
 	stmt, err := tx.Prepare(`
-	UPDATE deployments
+	UPDATE edgex_deployment
 	SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
 	WHERE id=$4;
 	`)
@@ -233,7 +343,7 @@
 	for _, result := range results {
 		res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
 		if err != nil {
-			log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
+			log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err)
 			return err
 		}
 		n, err := res.RowsAffected()
@@ -251,7 +361,7 @@
 
 func updateLocalBundleURI(depID, localBundleUri string) error {
 
-	stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
+	stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;")
 	if err != nil {
 		log.Errorf("prepare updateLocalBundleURI failed: %v", err)
 		return err
@@ -260,11 +370,41 @@
 
 	_, err = stmt.Exec(localBundleUri, depID)
 	if err != nil {
-		log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
+		log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
 		return err
 	}
 
-	log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
+	log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri)
 
 	return nil
 }
+
+func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error {
+
+	stmt, err := tx.Prepare(`
+	INSERT INTO edgex_deployment
+		(id, bundle_config_id, apid_cluster_id, data_scope_id,
+		bundle_config_json, config_json, created, created_by,
+		updated, updated_by, bundle_config_name)
+		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);
+	`)
+	if err != nil {
+		log.Errorf("prepare insert into edgex_deployment failed: %v", err)
+		return err
+	}
+	defer stmt.Close()
+
+	log.Debugf("InsertTestDeployment: %s", dep.ID)
+
+	_, err = stmt.Exec(
+		dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+		dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+		dep.Updated, dep.UpdatedBy, dep.BundleName)
+	if err != nil {
+		log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
+		return err
+	}
+
+	log.Debug("InsertTestDeployment edgex_deployment succeeded")
+	return err
+}
diff --git a/listener.go b/listener.go
index 8230176..8a4f017 100644
--- a/listener.go
+++ b/listener.go
@@ -54,69 +54,36 @@
 		log.Panicf("Unable to access database: %v", err)
 	}
 
-	err = InitDB(db)
+	// alter table
+	err = alterTable(db)
 	if err != nil {
-		log.Panicf("Unable to initialize database: %v", err)
+		log.Panicf("Alter table failed: %v", err)
 	}
-
-	var deploymentsToInsert []DataDeployment
-	var errResults apiDeploymentResults
-	for _, table := range snapshot.Tables {
-		switch table.Name {
-		case DEPLOYMENT_TABLE:
-			for _, row := range table.Rows {
-				dep, err := dataDeploymentFromRow(row)
-				if err == nil {
-					deploymentsToInsert = append(deploymentsToInsert, dep)
-				} else {
-					result := apiDeploymentResult{
-						ID:        dep.ID,
-						Status:    RESPONSE_STATUS_FAIL,
-						ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON,
-						Message:   fmt.Sprintf("unable to parse deployment: %v", err),
-					}
-					errResults = append(errResults, result)
-				}
-			}
-		}
-	}
-
 	// ensure that no new database updates are made on old database
 	dbMux.Lock()
-	defer dbMux.Unlock()
+	SetDB(db)
+	dbMux.Unlock()
 
+	// update deployments
+	deps, err := getDeploymentsToUpdate(db)
+	if err != nil {
+		log.Panicf("Unable to getDeploymentsToUpdate: %v", err)
+	}
 	tx, err := db.Begin()
 	if err != nil {
 		log.Panicf("Error starting transaction: %v", err)
 	}
 	defer tx.Rollback()
-
-	err = insertDeployments(tx, deploymentsToInsert)
+	err = updateDeploymentsColumns(tx, deps)
 	if err != nil {
-		log.Panicf("Error processing Snapshot: %v", err)
+		log.Panicf("updateDeploymentsColumns failed: %v", err)
 	}
-
 	err = tx.Commit()
 	if err != nil {
-		log.Panicf("Error committing Snapshot change: %v", err)
+		log.Panicf("Error committing Snapshot update: %v", err)
 	}
 
-	SetDB(db)
-
-	for _, dep := range deploymentsToInsert {
-		queueDownloadRequest(dep)
-	}
-
-	// transmit parsing errors back immediately
-	if len(errResults) > 0 {
-		go transmitDeploymentResultsToServer(errResults)
-	}
-
-	// if no tables, this a startup event for an existing DB
-	if len(snapshot.Tables) == 0 {
-		startupOnExistingDatabase()
-	}
-
+	startupOnExistingDatabase()
 	log.Debug("Snapshot processed")
 }
 
@@ -159,8 +126,8 @@
 
 func processChangeList(changes *common.ChangeList) {
 
-	// gather deleted bundle info
-	var deploymentsToInsert, deploymentsToDelete []DataDeployment
+	// changes have been applied to DB
+	var insertedDeployments, deletedDeployments []DataDeployment
 	var errResults apiDeploymentResults
 	for _, change := range changes.Changes {
 		switch change.Table {
@@ -169,7 +136,7 @@
 			case common.Insert:
 				dep, err := dataDeploymentFromRow(change.NewRow)
 				if err == nil {
-					deploymentsToInsert = append(deploymentsToInsert, dep)
+					insertedDeployments = append(insertedDeployments, dep)
 				} else {
 					result := apiDeploymentResult{
 						ID:        dep.ID,
@@ -188,7 +155,7 @@
 					ID:          id,
 					DataScopeID: dataScopeID,
 				}
-				deploymentsToDelete = append(deploymentsToDelete, dep)
+				deletedDeployments = append(deletedDeployments, dep)
 			default:
 				log.Errorf("unexpected operation: %s", change.Operation)
 			}
@@ -200,45 +167,23 @@
 		go transmitDeploymentResultsToServer(errResults)
 	}
 
-	tx, err := getDB().Begin()
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-	}
-	defer tx.Rollback()
-
-	for _, dep := range deploymentsToDelete {
-		err = deleteDeployment(tx, dep.ID)
-		if err != nil {
-			log.Panicf("Error processing ChangeList: %v", err)
-		}
-	}
-	err = insertDeployments(tx, deploymentsToInsert)
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-	}
-
-	err = tx.Commit()
-	if err != nil {
-		log.Panicf("Error processing ChangeList: %v", err)
-	}
-
-	for _, d := range deploymentsToDelete {
+	for _, d := range deletedDeployments {
 		deploymentsChanged <- d.ID
 	}
 
 	log.Debug("ChangeList processed")
 
-	for _, dep := range deploymentsToInsert {
+	for _, dep := range insertedDeployments {
 		queueDownloadRequest(dep)
 	}
 
 	// clean up old bundles
-	if len(deploymentsToDelete) > 0 {
-		log.Debugf("will delete %d old bundles", len(deploymentsToDelete))
+	if len(deletedDeployments) > 0 {
+		log.Debugf("will delete %d old bundles", len(deletedDeployments))
 		go func() {
 			// give clients a minute to avoid conflicts
 			time.Sleep(bundleCleanupDelay)
-			for _, dep := range deploymentsToDelete {
+			for _, dep := range deletedDeployments {
 				bundleFile := getBundleFile(dep)
 				log.Debugf("removing old bundle: %v", bundleFile)
 				safeDelete(bundleFile)
diff --git a/listener_test.go b/listener_test.go
index 0c79f3f..c253eb3 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -8,6 +8,7 @@
 
 	"net/http"
 
+	"fmt"
 	"github.com/30x/apid-core"
 	"github.com/apigee-labs/transicator/common"
 	. "github.com/onsi/ginkgo"
@@ -18,116 +19,56 @@
 
 	Context("ApigeeSync snapshot event", func() {
 
-		It("should set DB and process", func(done Done) {
+		/*
+		 * Note that the test snapshot should not be empty.
+		 * If it's empty, you can't use deploymentsResult chan to mark the end of processing,
+		 * so the threads generated by startupOnExistingDatabase() will mess up later tests
+		 */
+		It("should set DB to appropriate version", func(done Done) {
+			saveDB := getDB()
+			deploymentID := "set_version_test"
+			snapshot, dep := createSnapshotDeployment(deploymentID, "test_version")
 
-			deploymentID := "listener_test_1"
-
-			uri, err := url.Parse(testServer.URL)
+			db, err := data.DBVersion(snapshot.SnapshotInfo)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			uri.Path = "/bundles/1"
-			bundleUri := uri.String()
-			bundle1 := bundleConfigJson{
-				Name:         uri.Path,
-				URI:          bundleUri,
-				ChecksumType: "crc32",
-			}
-			bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri)
-			bundle1Json, err := json.Marshal(bundle1)
+			err = InitDB(db)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			row := common.Row{}
-			row["id"] = &common.ColumnVal{Value: deploymentID}
-			row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
-			var event = common.Snapshot{
-				SnapshotInfo: "test",
-				Tables: []common.Table{
-					{
-						Name: DEPLOYMENT_TABLE,
-						Rows: []common.Row{row},
-					},
-				},
-			}
+			insertDeploymentToDb(dep, db)
+			expectedDB, err := data.DBVersion(snapshot.SnapshotInfo)
+			Expect(err).NotTo(HaveOccurred())
 
 			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 
-			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+			apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
 
 			result := <-listener
-			Expect(result.err).ToNot(HaveOccurred())
+			Expect(result.err).ShouldNot(HaveOccurred())
 
-			// from event
-			Expect(len(result.deployments)).To(Equal(1))
-			d := result.deployments[0]
+			// DB should have been set
+			Expect(getDB() == expectedDB).Should(BeTrue())
 
-			Expect(d.ID).To(Equal(deploymentID))
-			Expect(d.BundleName).To(Equal(bundle1.Name))
-			Expect(d.BundleURI).To(Equal(bundle1.URI))
-
-			// from db
-			deployments, err := getReadyDeployments()
-			Expect(err).ShouldNot(HaveOccurred())
-
-			Expect(len(deployments)).To(Equal(1))
-			d = deployments[0]
-
-			Expect(d.ID).To(Equal(deploymentID))
-			Expect(d.BundleName).To(Equal(bundle1.Name))
-			Expect(d.BundleURI).To(Equal(bundle1.URI))
-
+			SetDB(saveDB)
 			close(done)
 		})
 
 		It("should process unready on existing db startup event", func(done Done) {
 
+			saveDB := getDB()
+
 			deploymentID := "startup_test"
 
-			uri, err := url.Parse(testServer.URL)
-			Expect(err).ShouldNot(HaveOccurred())
-
-			uri.Path = "/bundles/1"
-			bundleUri := uri.String()
-			bundle := bundleConfigJson{
-				Name:         uri.Path,
-				URI:          bundleUri,
-				ChecksumType: "crc32",
-			}
-			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-
-			dep := DataDeployment{
-				ID:                 deploymentID,
-				DataScopeID:        deploymentID,
-				BundleURI:          bundle.URI,
-				BundleChecksum:     bundle.Checksum,
-				BundleChecksumType: bundle.ChecksumType,
-			}
-
-			// init without info == startup on existing DB
-			var snapshot = common.Snapshot{
-				SnapshotInfo: "test",
-				Tables:       []common.Table{},
-			}
+			snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready")
 
 			db, err := data.DBVersion(snapshot.SnapshotInfo)
-			if err != nil {
-				log.Panicf("Unable to access database: %v", err)
-			}
+			Expect(err).ShouldNot(HaveOccurred())
 
 			err = InitDB(db)
-			if err != nil {
-				log.Panicf("Unable to initialize database: %v", err)
-			}
-
-			tx, err := db.Begin()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			err = InsertDeployment(tx, dep)
-			Expect(err).ShouldNot(HaveOccurred())
-
-			err = tx.Commit()
-			Expect(err).ShouldNot(HaveOccurred())
+			insertDeploymentToDb(dep, db)
 
 			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
@@ -141,11 +82,15 @@
 			d := result.deployments[0]
 
 			Expect(d.ID).To(Equal(deploymentID))
+
+			SetDB(saveDB)
 			close(done)
 		})
 
 		It("should send deployment statuses on existing db startup event", func(done Done) {
 
+			saveDB := getDB()
+
 			successDep := DataDeployment{
 				ID:                 "success",
 				LocalBundleURI:     "x",
@@ -189,6 +134,7 @@
 					Message:   failDep.DeployErrorMessage,
 				}))
 
+				SetDB(saveDB)
 				close(done)
 			}))
 
@@ -203,14 +149,10 @@
 			}
 
 			db, err := data.DBVersion(snapshot.SnapshotInfo)
-			if err != nil {
-				log.Panicf("Unable to access database: %v", err)
-			}
+			Expect(err).NotTo(HaveOccurred())
 
-			err = InitDB(db)
-			if err != nil {
-				log.Panicf("Unable to initialize database: %v", err)
-			}
+			err = InitDBFullColumns(db)
+			Expect(err).NotTo(HaveOccurred())
 
 			tx, err := db.Begin()
 			Expect(err).ShouldNot(HaveOccurred())
@@ -231,37 +173,19 @@
 
 	Context("ApigeeSync change event", func() {
 
-		It("add event should add a deployment", func(done Done) {
+		It("inserting event should deliver the deployment to subscribers", func(done Done) {
 
 			deploymentID := "add_test_1"
 
-			uri, err := url.Parse(testServer.URL)
+			event, dep := createChangeDeployment(deploymentID)
+
+			// insert full deployment columns
+			tx, err := getDB().Begin()
 			Expect(err).ShouldNot(HaveOccurred())
-
-			uri.Path = "/bundles/1"
-			bundleUri := uri.String()
-			bundle := bundleConfigJson{
-				Name:         uri.Path,
-				URI:          bundleUri,
-				ChecksumType: "crc32",
-			}
-			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-			bundle1Json, err := json.Marshal(bundle)
+			err = InsertDeployment(tx, dep)
 			Expect(err).ShouldNot(HaveOccurred())
-
-			row := common.Row{}
-			row["id"] = &common.ColumnVal{Value: deploymentID}
-			row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
-			var event = common.ChangeList{
-				Changes: []common.Change{
-					{
-						Operation: common.Insert,
-						Table:     DEPLOYMENT_TABLE,
-						NewRow:    row,
-					},
-				},
-			}
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
 
 			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
@@ -279,31 +203,39 @@
 			d := deployments[0]
 
 			Expect(d.ID).To(Equal(deploymentID))
-			Expect(d.BundleName).To(Equal(bundle.Name))
-			Expect(d.BundleURI).To(Equal(bundle.URI))
+			Expect(d.BundleName).To(Equal(dep.BundleName))
+			Expect(d.BundleURI).To(Equal(dep.BundleURI))
 
 			close(done)
 		})
 
-		It("delete event should delete a deployment", func(done Done) {
+		It("delete event should deliver to subscribers", func(done Done) {
 
 			deploymentID := "delete_test_1"
 
+			// insert deployment
+			event, dep := createChangeDeployment(deploymentID)
+
+			// insert full deployment columns
 			tx, err := getDB().Begin()
 			Expect(err).ShouldNot(HaveOccurred())
-			dep := DataDeployment{
-				ID:             deploymentID,
-				LocalBundleURI: "whatever",
-			}
 			err = InsertDeployment(tx, dep)
 			Expect(err).ShouldNot(HaveOccurred())
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
+			listener := make(chan deploymentsResult)
+			addSubscriber <- listener
+			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+			// wait for event to propagate
+			result := <-listener
+			Expect(result.err).ShouldNot(HaveOccurred())
+
+			// delete deployment
+			deletDeploymentFromDb(dep, getDB())
 			row := common.Row{}
 			row["id"] = &common.ColumnVal{Value: deploymentID}
-
-			var event = common.ChangeList{
+			event = common.ChangeList{
 				Changes: []common.Change{
 					{
 						Operation: common.Delete,
@@ -313,19 +245,99 @@
 				},
 			}
 
-			var listener = make(chan deploymentsResult)
+			listener = make(chan deploymentsResult)
 			addSubscriber <- listener
-
 			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
-			<-listener
-
-			deployments, err := getReadyDeployments()
-			Expect(err).ShouldNot(HaveOccurred())
-
-			Expect(len(deployments)).To(Equal(0))
-
+			result = <-listener
+			Expect(result.err).ShouldNot(HaveOccurred())
+			Expect(len(result.deployments)).To(Equal(0))
 			close(done)
 		})
 	})
 })
+
+func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) {
+	uri, err := url.Parse(testServer.URL)
+	Expect(err).ShouldNot(HaveOccurred())
+
+	uri.Path = "/bundles/1"
+	bundleUri := uri.String()
+	bundle := bundleConfigJson{
+		Name:         uri.Path,
+		URI:          bundleUri,
+		ChecksumType: "crc32",
+	}
+	bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+	bundle1Json, err := json.Marshal(bundle)
+	Expect(err).ShouldNot(HaveOccurred())
+
+	row := common.Row{}
+	row["id"] = &common.ColumnVal{Value: deploymentID}
+	row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+	changeList := common.ChangeList{
+		Changes: []common.Change{
+			{
+				Operation: common.Insert,
+				Table:     DEPLOYMENT_TABLE,
+				NewRow:    row,
+			},
+		},
+	}
+	dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow)
+	return changeList, dep
+}
+
+func insertDeploymentToDb(dep DataDeployment, db apid.DB) {
+	tx, err := db.Begin()
+	Expect(err).ShouldNot(HaveOccurred())
+	defer tx.Rollback()
+	err = InsertTestDeployment(tx, dep)
+	Expect(err).ShouldNot(HaveOccurred())
+	err = tx.Commit()
+	Expect(err).ShouldNot(HaveOccurred())
+}
+
+func deletDeploymentFromDb(dep DataDeployment, db apid.DB) {
+	tx, err := db.Begin()
+	Expect(err).ShouldNot(HaveOccurred())
+	defer tx.Rollback()
+	err = deleteDeployment(tx, dep.ID)
+	Expect(err).ShouldNot(HaveOccurred())
+	err = tx.Commit()
+	Expect(err).ShouldNot(HaveOccurred())
+}
+
+func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) {
+	uri, err := url.Parse(testServer.URL)
+	Expect(err).ShouldNot(HaveOccurred())
+
+	uri.Path = "/bundles/1"
+	bundleUri := uri.String()
+	bundle := bundleConfigJson{
+		Name:         uri.Path,
+		URI:          bundleUri,
+		ChecksumType: "crc32",
+	}
+	bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+
+	jsonBytes, err := json.Marshal(bundle)
+	Expect(err).ShouldNot(HaveOccurred())
+	fmt.Println("JSON :" + string(jsonBytes))
+
+	dep := DataDeployment{
+		ID:                 deploymentID,
+		DataScopeID:        deploymentID,
+		BundleURI:          bundle.URI,
+		BundleChecksum:     bundle.Checksum,
+		BundleChecksumType: bundle.ChecksumType,
+		BundleConfigJSON:   string(jsonBytes),
+	}
+
+	// init without info == startup on existing DB
+	var snapshot = common.Snapshot{
+		SnapshotInfo: snapInfo,
+		Tables:       []common.Table{},
+	}
+	return snapshot, dep
+}