Cleanup old bundles, set DeployStatus on bundle download fail, refactoring
diff --git a/api.go b/api.go
index 8a54a31..49bf679 100644
--- a/api.go
+++ b/api.go
@@ -87,9 +87,9 @@
 		case msg := <-deploymentsChanged:
 			// todo: add a debounce w/ timeout to avoid sending on every single deployment?
 			subs := subscribers
-			incrementETag() // todo: do this elsewhere? check error?
-			subscribers = make(map[chan string]struct{})
 			log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
+			subscribers = make(map[chan string]struct{})
+			incrementETag()
 			for subscriber := range subs {
 				select {
 				case subscriber <- msg:
diff --git a/api_test.go b/api_test.go
index 8ed074c..b371ea6 100644
--- a/api_test.go
+++ b/api_test.go
@@ -109,6 +109,7 @@
 			res, err := http.Get(uri.String())
 			Expect(err).ShouldNot(HaveOccurred())
 			defer res.Body.Close()
+			eTag := res.Header.Get("etag")
 
 			deploymentID = "api_get_current_blocking2"
 			go func() {
@@ -119,7 +120,7 @@
 				uri.RawQuery = query.Encode()
 				req, err := http.NewRequest("GET", uri.String(), nil)
 				req.Header.Add("Content-Type", "application/json")
-				req.Header.Add("If-None-Match", res.Header.Get("etag"))
+				req.Header.Add("If-None-Match", eTag)
 
 				res, err := http.DefaultClient.Do(req)
 				Expect(err).ShouldNot(HaveOccurred())
@@ -318,16 +319,18 @@
 		DataScopeID: deploymentID,
 		BundleConfigJSON: string(bundleJson),
 		ConfigJSON: string(bundleJson),
-		Status: "",
 		Created: "",
 		CreatedBy: "",
 		Updated: "",
 		UpdatedBy: "",
 		BundleName: deploymentID,
-		BundleURI: "",
-		BundleChecksum: "",
-		BundleChecksumType: "",
+		BundleURI: bundle.URI,
+		BundleChecksum: bundle.Checksum,
+		BundleChecksumType: bundle.ChecksumType,
 		LocalBundleURI: "x",
+		DeployStatus: "",
+		DeployErrorCode: 0,
+		DeployErrorMessage: "",
 	}
 
 	err = InsertDeployment(tx, dep)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index a4ff0e2..a43fb41 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -43,14 +43,14 @@
 	router := apid.API().Router()
 	// fake an unreliable bundle repo
 	backOffMultiplier = 10 * time.Millisecond
-	count := 0
+	count := 1
 	router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
 		count++
-		if count % 2 == 0 {
+		vars := apid.API().Vars(req)
+		if count % 2 == 0 || vars["id"] == "alwaysfail" {
 			w.WriteHeader(500)
 			return
 		}
-		vars := apid.API().Vars(req)
 		w.Write([]byte("/bundles/" + vars["id"]))
 	})
 	testServer = httptest.NewServer(router)
diff --git a/bundle.go b/bundle.go
index 4762165..3e018d0 100644
--- a/bundle.go
+++ b/bundle.go
@@ -25,26 +25,50 @@
 	backOffMultiplier = 10 * time.Second
 )
 
-func downloadBundle(dep DataDeployment) error {
+func downloadBundle(dep DataDeployment) {
 
 	log.Debugf("starting bundle download process: %s", dep.BundleURI)
 
 	hashWriter, err := getHashWriter(dep.BundleChecksumType)
 	if err != nil {
-		log.Errorf("invalid checksum type: %v", err)
-		return err
+		msg := fmt.Sprintf("invalid bundle checksum type: %v", dep.BundleChecksumType)
+		log.Error(msg)
+		setDeploymentResults(apiDeploymentResults{
+			{
+				ID:        dep.ID,
+				Status:    RESPONSE_STATUS_FAIL,
+				ErrorCode: ERROR_CODE_TODO,
+				Message:   msg,
+			},
+		})
+		return
 	}
 
-	// retry
-	var tempFile string
+	// todo: do forever with backoff - note: we'll want to abort if deployment is deleted, however
+	// todo: also, we'll still mark deployment result as "failed" - after some timeout
 	for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+		var tempFile, bundleFile string
 		tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+
+		if err == nil {
+			bundleFile = getBundleFile(dep)
+			err = os.Rename(tempFile, bundleFile)
+			if err != nil {
+				log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
+			}
+		}
+
+		if tempFile != "" {
+			go safeDelete(tempFile)
+		}
+
+		if err == nil {
+			err = updateLocalBundleURI(dep.ID, bundleFile)
+		}
+
 		if err == nil {
 			break
 		}
-		if tempFile != "" {
-			os.Remove(tempFile)
-		}
 
 		// simple back-off, we could potentially be more sophisticated
 		retryIn := time.Duration(i) * backOffMultiplier
@@ -52,29 +76,24 @@
 		time.Sleep(retryIn)
 		hashWriter.Reset()
 	}
-
 	if err != nil {
 		log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
-		return err
 	}
 
-	bundleFile := getBundleFile(dep)
-	err = os.Rename(tempFile, bundleFile)
 	if err != nil {
-		log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
-		os.Remove(tempFile)
-		return err
-	}
-
-	err = updateLocalURI(dep.ID, bundleFile)
-	if err != nil {
-		return err
+		setDeploymentResults(apiDeploymentResults{
+			{
+				ID:        dep.ID,
+				Status:    RESPONSE_STATUS_FAIL,
+				ErrorCode: ERROR_CODE_TODO,
+				Message:   fmt.Sprintf("bundle download failed: %s", err),
+			},
+		})
+		return
 	}
 
 	// send deployments to client
 	deploymentsChanged<- dep.ID
-
-	return nil
 }
 
 func getBundleFile(dep DataDeployment) string {
diff --git a/bundle_test.go b/bundle_test.go
new file mode 100644
index 0000000..2d1645c
--- /dev/null
+++ b/bundle_test.go
@@ -0,0 +1,81 @@
+package apiGatewayDeploy
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"net/url"
+	"encoding/json"
+)
+
+var _ = Describe("bundle", func() {
+
+	Context("download", func() {
+
+		It("should mark the status as failed if download fails", func() {
+
+			deploymentID := "bundle_download_fail"
+
+			uri, err := url.Parse(testServer.URL)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			uri.Path = "/bundles/alwaysfail"
+			bundleUri := uri.String()
+			bundle := bundleConfigJson{
+				Name: uri.Path,
+				URI: bundleUri,
+				ChecksumType: "crc-32",
+			}
+			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+			bundleJson, err := json.Marshal(bundle)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			tx, err := getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			dep := DataDeployment{
+				ID: deploymentID,
+				BundleConfigID: deploymentID,
+				ApidClusterID: deploymentID,
+				DataScopeID: deploymentID,
+				BundleConfigJSON: string(bundleJson),
+				ConfigJSON: string(bundleJson),
+				Created: "",
+				CreatedBy: "",
+				Updated: "",
+				UpdatedBy: "",
+				BundleName: deploymentID,
+				BundleURI: bundle.URI,
+				BundleChecksum: bundle.Checksum,
+				BundleChecksumType: bundle.ChecksumType,
+				LocalBundleURI: "",
+				DeployStatus: "",
+				DeployErrorCode: 0,
+				DeployErrorMessage: "",
+			}
+
+			err = InsertDeployment(tx, dep)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			var listener = make(chan string)
+			addSubscriber <- listener
+
+			downloadBundle(dep)
+
+			// get deployment
+			deployments, err := getDeployments("WHERE id=$1", deploymentID)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			Expect(len(deployments)).To(Equal(1))
+			d := deployments[0]
+
+			Expect(d.ID).To(Equal(deploymentID))
+			Expect(d.LocalBundleURI).To(BeEmpty())
+			Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+			Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+			Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+		})
+	})
+})
diff --git a/data.go b/data.go
index 45ad641..42f6efd 100644
--- a/data.go
+++ b/data.go
@@ -19,16 +19,18 @@
 	DataScopeID        string
 	BundleConfigJSON   string
 	ConfigJSON         string
-	Status             string
 	Created            string
 	CreatedBy          string
 	Updated            string
 	UpdatedBy          string
 	BundleName         string
 	BundleURI          string
+	LocalBundleURI     string
 	BundleChecksum     string
 	BundleChecksumType string
-	LocalBundleURI     string
+	DeployStatus       string
+	DeployErrorCode    int
+	DeployErrorMessage string
 }
 
 type SQLExec interface {
@@ -40,7 +42,7 @@
 	CREATE TABLE IF NOT EXISTS etag (
 		value integer
 	);
-	INSERT INTO etag VALUES (1);
+	INSERT INTO etag (value) VALUES (1);
 	CREATE TABLE IF NOT EXISTS deployments (
 		id character varying(36) NOT NULL,
 		bundle_config_id varchar(36) NOT NULL,
@@ -48,7 +50,6 @@
 		data_scope_id varchar(36) NOT NULL,
 		bundle_config_json text NOT NULL,
 		config_json text NOT NULL,
-		status text NOT NULL,
 		created timestamp without time zone,
 		created_by text,
 		updated timestamp without time zone,
@@ -56,12 +57,15 @@
 		bundle_name text,
 		bundle_uri text,
 		local_bundle_uri text,
+		bundle_checksum text,
+		bundle_checksum_type text,
 		deploy_status string,
 		deploy_error_code int,
 		deploy_error_message text,
 		PRIMARY KEY (id)
 	);
 	`)
+	// todo: is ID enough? must it be scoped by cluster id?
 	if err != nil {
 		return err
 	}
@@ -129,10 +133,11 @@
 	stmt, err := tx.Prepare(`
 	INSERT INTO deployments
 		(id, bundle_config_id, apid_cluster_id, data_scope_id,
-		bundle_config_json, config_json, status, created,
-		created_by, updated, updated_by, bundle_name,
-		bundle_uri, local_bundle_uri)
-		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
+		bundle_config_json, config_json, created, created_by,
+		updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+		bundle_checksum, bundle_checksum_type, deploy_status,
+		deploy_error_code, deploy_error_message)
+		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
 	`)
 	if err != nil {
 		log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
@@ -142,9 +147,10 @@
 
 	_, err = stmt.Exec(
 		dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
-		dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
-		dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
-		dep.BundleURI, dep.LocalBundleURI)
+		dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+		dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+		dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+		dep.DeployErrorCode, dep.DeployErrorMessage)
 	if err != nil {
 		log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
 		return err
@@ -179,35 +185,57 @@
 
 // getReadyDeployments() returns array of deployments that are ready to deploy
 func getReadyDeployments() (deployments []DataDeployment, err error) {
+	return getDeployments("WHERE local_bundle_uri != $1", "")
+}
 
+// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
+func getUnreadyDeployments() (deployments []DataDeployment, err error) {
+	return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "")
+}
+
+// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
+func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
 	db := getDB()
-	rows, err := db.Query(`
+
+	var stmt *sql.Stmt
+	stmt, err = db.Prepare(`
 	SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
-		bundle_config_json, config_json, status, created,
-		created_by, updated, updated_by, bundle_name,
-		bundle_uri, local_bundle_uri
+		bundle_config_json, config_json, created, created_by,
+		updated, updated_by, bundle_name, bundle_uri,
+		local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
+		deploy_error_code, deploy_error_message
 	FROM deployments
-	WHERE local_bundle_uri != ""
-	`)
+	` + where)
+	if err != nil {
+		return
+	}
+	var rows *sql.Rows
+	rows, err = stmt.Query(a...)
 	if err != nil {
 		if err == sql.ErrNoRows {
-			return deployments, nil
+			return
 		}
 		log.Errorf("Error querying deployments: %v", err)
 		return
 	}
 	defer rows.Close()
 
+	deployments = dataDeploymentsFromRows(rows)
+
+	return
+}
+
+func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
 	for rows.Next() {
 		dep := DataDeployment{}
 		rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
-			&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
-			&dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
-			&dep.BundleURI, &dep.LocalBundleURI,
+			&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
+			&dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
+			&dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
+			&dep.DeployErrorCode, &dep.DeployErrorMessage,
 		)
 		deployments = append(deployments, dep)
 	}
-
 	return
 }
 
@@ -252,18 +280,11 @@
 	return err
 }
 
-func updateLocalURI(depID, localBundleUri string) error {
+func updateLocalBundleURI(depID, localBundleUri string) error {
 
-	tx, err := getDB().Begin()
+	stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
 	if err != nil {
-		log.Errorf("begin updateLocalURI failed: %v", err)
-		return err
-	}
-	defer tx.Rollback()
-
-	stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
-	if err != nil {
-		log.Errorf("prepare updateLocalURI failed: %v", err)
+		log.Errorf("prepare updateLocalBundleURI failed: %v", err)
 		return err
 	}
 	defer stmt.Close()
@@ -274,13 +295,16 @@
 		return err
 	}
 
-	err = tx.Commit()
-	if err != nil {
-		log.Errorf("commit updateLocalURI failed: %v", err)
-		return err
-	}
-
 	log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
 
 	return nil
 }
+
+func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+
+	err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+	if err == sql.ErrNoRows {
+		err = nil
+	}
+	return
+}
diff --git a/listener.go b/listener.go
index 3b57542..91f0fe4 100644
--- a/listener.go
+++ b/listener.go
@@ -5,6 +5,7 @@
 	"encoding/json"
 	"github.com/30x/apid"
 	"github.com/apigee-labs/transicator/common"
+	"os"
 )
 
 const (
@@ -84,6 +85,18 @@
 	}
 
 	SetDB(db)
+
+	// if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
+	if len(snapshot.Tables) == 0 {
+		deployments, err := getUnreadyDeployments()
+		if err != nil {
+			log.Panicf("unable to query database for unready deployments: %v", err)
+		}
+		for _, dep := range deployments {
+			go downloadBundle(dep)
+		}
+	}
+
 	log.Debug("Snapshot processed")
 }
 
@@ -94,6 +107,7 @@
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
 	defer tx.Rollback()
+	var bundlesToDelete []string
 	for _, change := range changes.Changes {
 		var err error
 		switch change.Table {
@@ -103,10 +117,11 @@
 				err = addDeployment(tx, change.NewRow)
 			case common.Delete:
 				var id string
-				err = change.OldRow.Get("id", &id)
+				change.OldRow.Get("id", &id)
+				localBundleUri, err := getLocalBundleURI(tx, id)
 				if err == nil {
+					bundlesToDelete = append(bundlesToDelete, localBundleUri)
 					err = deleteDeployment(tx, id)
-					// todo: delete downloaded bundle file
 				}
 			default:
 				log.Errorf("unexpected operation: %s", change.Operation)
@@ -120,58 +135,37 @@
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
+
+	// clean up old bundles
+	if len(bundlesToDelete) > 0 {
+		log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+		go func() {
+			// give clients a minute to avoid conflicts
+			// todo: configurable time
+			//time.Sleep(1 * time.Minute)
+			for _, b := range bundlesToDelete {
+				log.Debugf("removing old bundle: %v", b)
+				safeDelete(b)
+			}
+		}()
+	}
 }
 
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) {
 
-	d := DataDeployment{}
-	err = row.Get("id", &d.ID)
-	if err != nil {
-		return
-	}
-	err = row.Get("bundle_config_id", &d.BundleConfigID)
-	if err != nil {
-		return
-	}
-	err = row.Get("apid_cluster_id", &d.ApidClusterID)
-	if err != nil {
-		return
-	}
-	err = row.Get("data_scope_id", &d.DataScopeID)
-	if err != nil {
-		return
-	}
-	err = row.Get("bundle_config_json", &d.BundleConfigJSON)
-	if err != nil {
-		return
-	}
-	err = row.Get("config_json", &d.ConfigJSON)
-	if err != nil {
-		return
-	}
-	err = row.Get("status", &d.Status)
-	if err != nil {
-		return
-	}
-	err = row.Get("created", &d.Created)
-	if err != nil {
-		return
-	}
-	err = row.Get("created_by", &d.CreatedBy)
-	if err != nil {
-		return
-	}
-	err = row.Get("updated", &d.Updated)
-	if err != nil {
-		return
-	}
-	err = row.Get("updated_by", &d.UpdatedBy)
-	if err != nil {
-		return
-	}
+	row.Get("id", &d.ID)
+	row.Get("bundle_config_id", &d.BundleConfigID)
+	row.Get("apid_cluster_id", &d.ApidClusterID)
+	row.Get("data_scope_id", &d.DataScopeID)
+	row.Get("bundle_config_json", &d.BundleConfigJSON)
+	row.Get("config_json", &d.ConfigJSON)
+	row.Get("created", &d.Created)
+	row.Get("created_by", &d.CreatedBy)
+	row.Get("updated", &d.Updated)
+	row.Get("updated_by", &d.UpdatedBy)
 
 	var bc bundleConfigJson
-	err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
+	json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
 	if err != nil {
 		log.Errorf("JSON decoding Manifest failed: %v", err)
 		return
@@ -182,6 +176,17 @@
 	d.BundleChecksumType = bc.ChecksumType
 	d.BundleChecksum = bc.Checksum
 
+	return
+}
+
+func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+
+	var d DataDeployment
+	d, err = dataDeploymentFromRow(row)
+	if err != nil {
+		return
+	}
+
 	err = InsertDeployment(tx, d)
 	if err != nil {
 		return
@@ -191,3 +196,9 @@
 	go downloadBundle(d)
 	return
 }
+
+func safeDelete(file string) {
+	if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
+		log.Warnf("unable to delete file %s: %v", file, e)
+	}
+}