Merge pull request #6 from 30x/XAPID-586

Xapid 586
diff --git a/README.md b/README.md
index c5bf99a..168daed 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,4 @@
-# apidVerifyAPIKey
+# apidGatewayDeploy
 
 This core plugin for [apid](http://github.com/30x/apid) responds to 
 [apidApigeeSync](https://github.com/30x/apidApigeeSync) events and publishes an API that allows clients to 
@@ -13,6 +13,26 @@
 
 See [apidGatewayDeploy-api.yaml]() for full spec.
 
+## Configuration
+
+#### gatewaydeploy_debounce_duration
+Window of time during which deployment changes are gathered before sending to client.
+Default: "bundles"
+
+#### gatewaydeploy_bundle_cleanup_delay
+Duration between deleting a deployment and deleting it's bundles on disk. 
+Default: "1s"
+
+#### gatewaydeploy_bundle_download_timeout
+Duration before bundle download marks deployment as failed (will continue retries regardless). 
+Default: "1m"
+
+#### gatewaydeploy_bundle_dir
+Relative location from local_storage_path in which to store local bundle files.
+Default: "5m"
+
+(durations note, see: https://golang.org/pkg/time/#ParseDuration)
+
 ## Building and running standalone
 
 First, install prerequisites:
diff --git a/api.go b/api.go
index 8a54a31..fec8239 100644
--- a/api.go
+++ b/api.go
@@ -6,7 +6,10 @@
 	"fmt"
 	"io/ioutil"
 	"net/http"
+	"net/url"
 	"strconv"
+	"sync"
+	"sync/atomic"
 	"time"
 )
 
@@ -21,6 +24,7 @@
 var (
 	deploymentsChanged = make(chan string)
 	addSubscriber      = make(chan chan string)
+	eTag               int64
 )
 
 type errorResponse struct {
@@ -58,7 +62,7 @@
 
 func InitAPI() {
 	services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
-	services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("POST")
+	services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT")
 }
 
 func writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -82,25 +86,45 @@
 
 func distributeEvents() {
 	subscribers := make(map[chan string]struct{})
-	for {
+	mut := sync.Mutex{}
+	msg := ""
+	debouncer := func() {
 		select {
-		case msg := <-deploymentsChanged:
-			// todo: add a debounce w/ timeout to avoid sending on every single deployment?
+		case <-time.After(debounceDuration):
+			mut.Lock()
 			subs := subscribers
-			incrementETag() // todo: do this elsewhere? check error?
 			subscribers = make(map[chan string]struct{})
-			log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
+			m := msg
+			msg = ""
+			incrementETag()
+			mut.Unlock()
+			log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
 			for subscriber := range subs {
 				select {
-				case subscriber <- msg:
-					log.Debugf("Handling deploy response for: %s", msg)
+				case subscriber <- m:
+					log.Debugf("Handling deploy response for: %s", m)
+					log.Debugf("delivering TO: %v", subscriber)
 				default:
 					log.Debugf("listener too far behind, message dropped")
 				}
 			}
+		}
+	}
+	for {
+		select {
+		case newMsg := <-deploymentsChanged:
+			mut.Lock()
+			log.Debug("deploymentsChanged")
+			if msg == "" {
+				go debouncer()
+			}
+			msg = newMsg
+			mut.Unlock()
 		case subscriber := <-addSubscriber:
 			log.Debugf("Add subscriber: %v", subscriber)
+			mut.Lock()
 			subscribers[subscriber] = struct{}{}
+			mut.Unlock()
 		}
 	}
 }
@@ -132,11 +156,7 @@
 	log.Debugf("if-none-match: %s", ifNoneMatch)
 
 	// send unmodified if matches prior eTag and no timeout
-	eTag, err := getETag()
-	if err != nil {
-		writeDatabaseError(w)
-		return
-	}
+	eTag := getETag()
 	if eTag == ifNoneMatch && timeout == 0 {
 		w.WriteHeader(http.StatusNotModified)
 		return
@@ -189,16 +209,16 @@
 
 	for _, d := range dataDeps {
 		apiDeps = append(apiDeps, ApiDeployment{
-			ID:                d.ID,
-			ScopeId:           d.DataScopeID,
-			Created:           d.Created,
-			CreatedBy:         d.CreatedBy,
-			Updated:           d.Updated,
-			UpdatedBy:         d.UpdatedBy,
-			BundleConfigJson:  []byte(d.BundleConfigJSON),
-			ConfigJson:        []byte(d.ConfigJSON),
-			DisplayName:       d.BundleName,
-			URI:               d.LocalBundleURI,
+			ID:               d.ID,
+			ScopeId:          d.DataScopeID,
+			Created:          d.Created,
+			CreatedBy:        d.CreatedBy,
+			Updated:          d.Updated,
+			UpdatedBy:        d.UpdatedBy,
+			BundleConfigJson: []byte(d.BundleConfigJSON),
+			ConfigJson:       []byte(d.ConfigJSON),
+			DisplayName:      d.BundleName,
+			URI:              d.LocalBundleURI,
 		})
 	}
 
@@ -228,35 +248,93 @@
 	// validate the results
 	// todo: these errors to the client should be standardized
 	var errs bytes.Buffer
-	for i, rsp := range results {
-		if rsp.ID == "" {
+	var validResults apiDeploymentResults
+	for i, result := range results {
+		valid := true
+		if result.ID == "" {
 			errs.WriteString(fmt.Sprintf("Missing id at %d\n", i))
 		}
 
-		if rsp.Status != RESPONSE_STATUS_SUCCESS && rsp.Status != RESPONSE_STATUS_FAIL {
-			errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n", RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
+		if result.Status != RESPONSE_STATUS_SUCCESS && result.Status != RESPONSE_STATUS_FAIL {
+			errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n",
+				RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
 		}
 
-		if rsp.Status == RESPONSE_STATUS_FAIL {
-			if rsp.ErrorCode == 0 {
+		if result.Status == RESPONSE_STATUS_FAIL {
+			if result.ErrorCode == 0 {
 				errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i))
 			}
-			if rsp.Message == "" {
+			if result.Message == "" {
 				errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i))
 			}
 		}
+
+		if valid {
+			validResults = append(validResults, result)
+		}
 	}
+
 	if errs.Len() > 0 {
 		writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, errs.String())
+		return
 	}
 
-	err = setDeploymentResults(results)
+	if len(validResults) > 0 {
+		go transmitDeploymentResultsToServer(validResults)
+		setDeploymentResults(validResults)
+	}
+
+	w.Write([]byte("OK"))
+}
+
+func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error {
+
+	retryIn := bundleRetryDelay
+	maxBackOff := 5 * time.Minute
+	backOffFunc := createBackoff(retryIn, maxBackOff)
+
+	uri, err := url.Parse(apiServerBaseURI.String())
 	if err != nil {
-		writeDatabaseError(w)
+		log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err)
+		return err
+	}
+	uri.Path = fmt.Sprintf("/clusters/%s/apids/%s/deployments", apidClusterID, apidInstanceID)
+
+	resultJSON, err := json.Marshal(validResults)
+	if err != nil {
+		log.Errorf("unable to marshal deployment results %v: %v", validResults, err)
+		return err
 	}
 
-	// todo: transmit to server (API TBD)
-	//err = transmitDeploymentResultsToServer()
+	for {
+		log.Debugf("transmitting deployment results to tracker: %s", string(resultJSON))
+		req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(resultJSON))
+		req.Header.Add("Content-Type", "application/json")
 
-	return
+		resp, err := http.DefaultClient.Do(req)
+		if err != nil || resp.StatusCode != http.StatusOK {
+			if err != nil {
+				log.Errorf("failed to communicate with tracking service: %v", err)
+			} else {
+				b, _ := ioutil.ReadAll(resp.Body)
+				log.Errorf("tracking service call failed. code: %d, body: %s", resp.StatusCode, string(b))
+			}
+			backOffFunc()
+			resp.Body.Close()
+			continue
+		}
+
+		resp.Body.Close()
+		return nil
+	}
+}
+
+// call whenever the list of deployments changes
+func incrementETag() {
+	atomic.AddInt64(&eTag, 1)
+}
+
+func getETag() string {
+	e := atomic.LoadInt64(&eTag)
+	return strconv.FormatInt(e, 10)
 }
diff --git a/api_test.go b/api_test.go
index cb8ec38..1f35233 100644
--- a/api_test.go
+++ b/api_test.go
@@ -14,11 +14,6 @@
 
 var _ = Describe("api", func() {
 
-	BeforeEach(func() {
-		_, err := getDB().Exec("DELETE FROM deployments")
-		Expect(err).ShouldNot(HaveOccurred())
-	})
-
 	Context("GET /deployments", func() {
 
 		It("should get an empty array if no deployments", func() {
@@ -32,6 +27,17 @@
 			Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
 		})
 
+		It("should debounce requests", func() {
+			var listener = make(chan string)
+			addSubscriber <- listener
+
+			deploymentsChanged <- "x"
+			deploymentsChanged <- "y"
+
+			id := <-listener
+			Expect(id).To(Equal("y"))
+		})
+
 		It("should get current deployments", func() {
 
 			deploymentID := "api_get_current"
@@ -102,7 +108,6 @@
 			defer res.Body.Close()
 
 			Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
 		})
 
 		It("should get new deployment after blocking", func(done Done) {
@@ -114,6 +119,7 @@
 			res, err := http.Get(uri.String())
 			Expect(err).ShouldNot(HaveOccurred())
 			defer res.Body.Close()
+			eTag := res.Header.Get("etag")
 
 			deploymentID = "api_get_current_blocking2"
 			go func() {
@@ -124,7 +130,7 @@
 				uri.RawQuery = query.Encode()
 				req, err := http.NewRequest("GET", uri.String(), nil)
 				req.Header.Add("Content-Type", "application/json")
-				req.Header.Add("If-None-Match", res.Header.Get("etag"))
+				req.Header.Add("If-None-Match", eTag)
 
 				res, err := http.DefaultClient.Do(req)
 				Expect(err).ShouldNot(HaveOccurred())
@@ -176,7 +182,7 @@
 		})
 	})
 
-	Context("POST /deployments", func() {
+	Context("PUT /deployments", func() {
 
 		It("should return BadRequest for invalid request", func() {
 
@@ -190,7 +196,7 @@
 			payload, err := json.Marshal(deploymentResult)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+			req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
 			req.Header.Add("Content-Type", "application/json")
 
 			resp, err := http.DefaultClient.Do(req)
@@ -215,7 +221,7 @@
 			payload, err := json.Marshal(deploymentResult)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+			req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
 			req.Header.Add("Content-Type", "application/json")
 
 			resp, err := http.DefaultClient.Do(req)
@@ -242,7 +248,7 @@
 			payload, err := json.Marshal(deploymentResult)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+			req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
 			req.Header.Add("Content-Type", "application/json")
 
 			resp, err := http.DefaultClient.Do(req)
@@ -265,7 +271,7 @@
 			uri, err := url.Parse(testServer.URL)
 			uri.Path = deploymentsEndpoint
 
-			deploymentResult := apiDeploymentResults{
+			deploymentResults := apiDeploymentResults{
 				apiDeploymentResult{
 					ID: deploymentID,
 					Status: RESPONSE_STATUS_FAIL,
@@ -273,10 +279,10 @@
 					Message: "Some error message",
 				},
 			}
-			payload, err := json.Marshal(deploymentResult)
+			payload, err := json.Marshal(deploymentResults)
 			Expect(err).ShouldNot(HaveOccurred())
 
-			req, err := http.NewRequest("POST", uri.String(), bytes.NewReader(payload))
+			req, err := http.NewRequest("PUT", uri.String(), bytes.NewReader(payload))
 			req.Header.Add("Content-Type", "application/json")
 
 			resp, err := http.DefaultClient.Do(req)
@@ -294,6 +300,29 @@
 			Expect(deploy_error_code).Should(Equal(100))
 			Expect(deploy_error_message).Should(Equal("Some error message"))
 		})
+
+		It("should communicate status to tracking server", func() {
+			deploymentResults := apiDeploymentResults{
+				apiDeploymentResult{
+					ID: "deploymentID",
+					Status: RESPONSE_STATUS_FAIL,
+					ErrorCode: 100,
+					Message: "Some error message",
+				},
+			}
+
+			err := transmitDeploymentResultsToServer(deploymentResults)
+			Expect(err).NotTo(HaveOccurred())
+
+			Expect(testLastTrackerVars["clusterID"]).To(Equal("CLUSTER_ID"))
+			Expect(testLastTrackerVars["instanceID"]).To(Equal("INSTANCE_ID"))
+			Expect(testLastTrackerBody).ToNot(BeEmpty())
+
+			var uploaded apiDeploymentResults
+			json.Unmarshal(testLastTrackerBody, &uploaded)
+
+			Expect(uploaded).To(Equal(deploymentResults))
+		})
 	})
 })
 
@@ -323,16 +352,18 @@
 		DataScopeID: deploymentID,
 		BundleConfigJSON: string(bundleJson),
 		ConfigJSON: string(bundleJson),
-		Status: "",
 		Created: "",
 		CreatedBy: "",
 		Updated: "",
 		UpdatedBy: "",
 		BundleName: deploymentID,
-		BundleURI: "",
-		BundleChecksum: "",
-		BundleChecksumType: "",
+		BundleURI: bundle.URI,
+		BundleChecksum: bundle.Checksum,
+		BundleChecksumType: bundle.ChecksumType,
 		LocalBundleURI: "x",
+		DeployStatus: "",
+		DeployErrorCode: 0,
+		DeployErrorMessage: "",
 	}
 
 	err = InsertDeployment(tx, dep)
diff --git a/apidGatewayDeploy-api.yaml b/apidGatewayDeploy-api.yaml
index 916c081..c26d47f 100644
--- a/apidGatewayDeploy-api.yaml
+++ b/apidGatewayDeploy-api.yaml
@@ -118,7 +118,7 @@
             $ref: '#/definitions/DeploymentResponse'
         '304':
           description: Deployment not modified.
-    post:
+    put:
       description: Save results of deployment
       parameters:
         - name: _
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 3ce174c..061879d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -17,8 +17,10 @@
 )
 
 var (
-	tmpDir     string
-	testServer *httptest.Server
+	tmpDir              string
+	testServer          *httptest.Server
+	testLastTrackerVars map[string]string
+	testLastTrackerBody []byte
 )
 
 var _ = BeforeSuite(func() {
@@ -31,6 +33,9 @@
 	Expect(err).NotTo(HaveOccurred())
 
 	config.Set("local_storage_path", tmpDir)
+	config.Set(configApidInstanceID, "INSTANCE_ID")
+	config.Set(configApidClusterID, "CLUSTER_ID")
+	config.Set(configApiServerBaseURI, "http://localhost")
 
 	apid.InitializePlugins()
 
@@ -40,20 +45,48 @@
 	Expect(err).NotTo(HaveOccurred())
 	SetDB(db)
 
+	debounceDuration = time.Millisecond
+	bundleCleanupDelay = time.Millisecond
+	bundleRetryDelay = 10 * time.Millisecond
+	bundleDownloadTimeout = 50 * time.Millisecond
+
 	router := apid.API().Router()
 	// fake an unreliable bundle repo
-	backOffMultiplier = 10 * time.Millisecond
-	count := 0
+	count := 1
 	router.HandleFunc("/bundles/{id}", func(w http.ResponseWriter, req *http.Request) {
 		count++
+		vars := apid.API().Vars(req)
 		if count % 2 == 0 {
 			w.WriteHeader(500)
 			return
 		}
-		vars := apid.API().Vars(req)
+		if vars["id"] == "longfail" {
+			time.Sleep(bundleDownloadTimeout + (250 * time.Millisecond))
+		}
 		w.Write([]byte("/bundles/" + vars["id"]))
-	})
+
+	}).Methods("GET")
+
+	// fake an unreliable APID tracker
+	router.HandleFunc("/clusters/{clusterID}/apids/{instanceID}/deployments",
+		func(w http.ResponseWriter, req *http.Request) {
+			count++
+			if count % 2 == 0 {
+				w.WriteHeader(500)
+				return
+			}
+
+			testLastTrackerVars = apid.API().Vars(req)
+			testLastTrackerBody, err = ioutil.ReadAll(req.Body)
+			Expect(err).ToNot(HaveOccurred())
+
+			w.Write([]byte("OK"))
+
+	}).Methods("PUT")
 	testServer = httptest.NewServer(router)
+
+	apiServerBaseURI, err = url.Parse(testServer.URL)
+	Expect(err).NotTo(HaveOccurred())
 })
 
 var _ = AfterSuite(func() {
@@ -64,6 +97,12 @@
 	os.RemoveAll(tmpDir)
 })
 
+var _ = BeforeEach(func() {
+	_, err := getDB().Exec("DELETE FROM deployments")
+	Expect(err).ShouldNot(HaveOccurred())
+	_, err = getDB().Exec("UPDATE etag SET value=1")
+})
+
 func TestApidGatewayDeploy(t *testing.T) {
 	RegisterFailHandler(Fail)
 	RunSpecs(t, "ApidGatewayDeploy Suite")
diff --git a/bundle.go b/bundle.go
index 4762165..44f9a9c 100644
--- a/bundle.go
+++ b/bundle.go
@@ -1,80 +1,115 @@
 package apiGatewayDeploy
 
 import (
+	"crypto/md5"
+	"encoding/base64"
+	"encoding/hex"
+	"errors"
 	"fmt"
+	"hash"
+	"hash/crc32"
 	"io"
+	"io/ioutil"
 	"net/http"
 	"net/url"
 	"os"
 	"path"
 	"time"
-	"encoding/base64"
-	"io/ioutil"
-	"hash/crc32"
-	"errors"
-	"crypto/md5"
-	"hash"
-	"encoding/hex"
 )
 
-const (
-	DOWNLOAD_ATTEMPTS = 3
-)
+var bundleRetryDelay time.Duration = time.Second
+var bundleDownloadTimeout time.Duration = 10 * time.Minute
 
-var (
-	backOffMultiplier = 10 * time.Second
-)
+// simple doubling back-off
+func createBackoff(retryIn, maxBackOff time.Duration) func() {
+	return func() {
+		log.Debugf("backoff called. will retry in %s.", retryIn)
+		time.Sleep(retryIn)
+		retryIn = retryIn * time.Duration(2)
+		if retryIn > maxBackOff {
+			retryIn = maxBackOff
+		}
+	}
+}
 
-func downloadBundle(dep DataDeployment) error {
+func downloadBundle(dep DataDeployment) {
 
-	log.Debugf("starting bundle download process: %s", dep.BundleURI)
+	log.Debugf("starting bundle download process for %s: %s", dep.ID, dep.BundleURI)
 
 	hashWriter, err := getHashWriter(dep.BundleChecksumType)
 	if err != nil {
-		log.Errorf("invalid checksum type: %v", err)
-		return err
+		msg := fmt.Sprintf("invalid bundle checksum type: %v", dep.BundleChecksumType)
+		log.Error(msg)
+		setDeploymentResults(apiDeploymentResults{
+			{
+				ID:        dep.ID,
+				Status:    RESPONSE_STATUS_FAIL,
+				ErrorCode: ERROR_CODE_TODO,
+				Message:   msg,
+			},
+		})
+		return
 	}
 
-	// retry
-	var tempFile string
-	for i := 1; i <= DOWNLOAD_ATTEMPTS; i++ {
+	retryIn := bundleRetryDelay
+	maxBackOff := 5 * time.Minute
+	backOffFunc := createBackoff(retryIn, maxBackOff)
+
+	// timeout and mark deployment failed
+	timeout := time.NewTimer(bundleDownloadTimeout)
+	go func() {
+		<-timeout.C
+		log.Debugf("bundle download timeout. marking deployment %s failed. will keep retrying: %s", dep.ID, dep.BundleURI)
+		var errMessage string
+		if err != nil {
+			errMessage = fmt.Sprintf("bundle download failed: %s", err)
+		} else {
+			errMessage = "bundle download failed"
+		}
+		setDeploymentResults(apiDeploymentResults{
+			{
+				ID:        dep.ID,
+				Status:    RESPONSE_STATUS_FAIL,
+				ErrorCode: ERROR_CODE_TODO,
+				Message:   errMessage,
+			},
+		})
+	}()
+
+	// todo: we'll want to abort download if deployment is deleted
+	for {
+		var tempFile, bundleFile string
 		tempFile, err = downloadFromURI(dep.BundleURI, hashWriter, dep.BundleChecksum)
+
+		if err == nil {
+			bundleFile = getBundleFile(dep)
+			err = os.Rename(tempFile, bundleFile)
+			if err != nil {
+				log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
+			}
+		}
+
+		if tempFile != "" {
+			go safeDelete(tempFile)
+		}
+
+		if err == nil {
+			err = updateLocalBundleURI(dep.ID, bundleFile)
+		}
+
+		// success!
 		if err == nil {
 			break
 		}
-		if tempFile != "" {
-			os.Remove(tempFile)
-		}
 
-		// simple back-off, we could potentially be more sophisticated
-		retryIn := time.Duration(i) * backOffMultiplier
-		log.Debugf("will retry failed download in %s: %v", retryIn, err)
-		time.Sleep(retryIn)
+		backOffFunc()
 		hashWriter.Reset()
 	}
 
-	if err != nil {
-		log.Errorf("failed %d download attempts. aborting.", DOWNLOAD_ATTEMPTS)
-		return err
-	}
-
-	bundleFile := getBundleFile(dep)
-	err = os.Rename(tempFile, bundleFile)
-	if err != nil {
-		log.Errorf("Unable to rename temp bundle file %s to %s: %s", tempFile, bundleFile, err)
-		os.Remove(tempFile)
-		return err
-	}
-
-	err = updateLocalURI(dep.ID, bundleFile)
-	if err != nil {
-		return err
-	}
+	log.Debugf("bundle for %s downloaded: %s", dep.ID, dep.BundleURI)
 
 	// send deployments to client
-	deploymentsChanged<- dep.ID
-
-	return nil
+	deploymentsChanged <- dep.ID
 }
 
 func getBundleFile(dep DataDeployment) string {
diff --git a/bundle_test.go b/bundle_test.go
new file mode 100644
index 0000000..1c959d9
--- /dev/null
+++ b/bundle_test.go
@@ -0,0 +1,100 @@
+package apiGatewayDeploy
+
+import (
+	"encoding/json"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"net/url"
+	"time"
+)
+
+var _ = Describe("bundle", func() {
+
+	Context("download", func() {
+
+		It("should timeout, mark status as failed, then finish", func() {
+
+			deploymentID := "bundle_download_fail"
+
+			uri, err := url.Parse(testServer.URL)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			uri.Path = "/bundles/longfail"
+			bundleUri := uri.String()
+			bundle := bundleConfigJson{
+				Name:         uri.Path,
+				URI:          bundleUri,
+				ChecksumType: "crc-32",
+			}
+			bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+			bundleJson, err := json.Marshal(bundle)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			tx, err := getDB().Begin()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			dep := DataDeployment{
+				ID:                 deploymentID,
+				BundleConfigID:     deploymentID,
+				ApidClusterID:      deploymentID,
+				DataScopeID:        deploymentID,
+				BundleConfigJSON:   string(bundleJson),
+				ConfigJSON:         string(bundleJson),
+				Created:            "",
+				CreatedBy:          "",
+				Updated:            "",
+				UpdatedBy:          "",
+				BundleName:         deploymentID,
+				BundleURI:          bundle.URI,
+				BundleChecksum:     bundle.Checksum,
+				BundleChecksumType: bundle.ChecksumType,
+				LocalBundleURI:     "",
+				DeployStatus:       "",
+				DeployErrorCode:    0,
+				DeployErrorMessage: "",
+			}
+
+			err = InsertDeployment(tx, dep)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			err = tx.Commit()
+			Expect(err).ShouldNot(HaveOccurred())
+
+			go downloadBundle(dep)
+
+			// give download time to timeout
+			time.Sleep(bundleDownloadTimeout + (100 * time.Millisecond))
+
+			// get error state deployment
+			deployments, err := getDeployments("WHERE id=$1", deploymentID)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			Expect(len(deployments)).To(Equal(1))
+			d := deployments[0]
+
+			Expect(d.ID).To(Equal(deploymentID))
+			Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+			Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+			Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+			Expect(d.LocalBundleURI).To(BeEmpty())
+
+			var listener = make(chan string)
+			addSubscriber <- listener
+			<-listener
+
+			// get finished deployment
+			// still in error state (let client update), but with valid local bundle
+			deployments, err = getDeployments("WHERE id=$1", deploymentID)
+			Expect(err).ShouldNot(HaveOccurred())
+
+			Expect(len(deployments)).To(Equal(1))
+			d = deployments[0]
+
+			Expect(d.ID).To(Equal(deploymentID))
+			Expect(d.DeployStatus).To(Equal(RESPONSE_STATUS_FAIL))
+			Expect(d.DeployErrorCode).To(Equal(ERROR_CODE_TODO))
+			Expect(d.DeployErrorMessage).ToNot(BeEmpty())
+			Expect(d.LocalBundleURI).To(BeAnExistingFile())
+		})
+	})
+})
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go
index d52314f..46e75d4 100644
--- a/cmd/apidGatewayDeploy/main.go
+++ b/cmd/apidGatewayDeploy/main.go
@@ -1,14 +1,14 @@
 package main
 
 import (
+	"encoding/json"
 	"flag"
 	"github.com/30x/apid"
 	"github.com/30x/apid/factory"
+	"github.com/30x/apidGatewayDeploy"
 	_ "github.com/30x/apidGatewayDeploy"
 	"io/ioutil"
-	"github.com/30x/apidGatewayDeploy"
 	"os"
-	"encoding/json"
 )
 
 func main() {
@@ -33,7 +33,6 @@
 		defer os.RemoveAll(tmpDir)
 
 		configService.Set("data_path", tmpDir)
-		configService.Set("gatewaydeploy_bundle_dir", tmpDir) // todo: legacy?
 
 		if deploymentsFile != "" {
 			bytes, err := ioutil.ReadFile(deploymentsFile)
@@ -103,7 +102,6 @@
 			DataScopeID:        ad.ScopeId,
 			BundleConfigJSON:   string(ad.BundleConfigJson),
 			ConfigJSON:         string(ad.ConfigJson),
-			Status:             "",
 			Created:            "",
 			CreatedBy:          "",
 			Updated:            "",
@@ -115,7 +113,6 @@
 			LocalBundleURI:     ad.URI,
 		}
 
-
 		err = apiGatewayDeploy.InsertDeployment(tx, dep)
 		if err != nil {
 			log.Error("Unable to insert deployment")
@@ -132,4 +129,4 @@
 	apiGatewayDeploy.InitAPI()
 
 	return nil
-}
\ No newline at end of file
+}
diff --git a/data.go b/data.go
index 45ad641..dc46ab7 100644
--- a/data.go
+++ b/data.go
@@ -19,16 +19,18 @@
 	DataScopeID        string
 	BundleConfigJSON   string
 	ConfigJSON         string
-	Status             string
 	Created            string
 	CreatedBy          string
 	Updated            string
 	UpdatedBy          string
 	BundleName         string
 	BundleURI          string
+	LocalBundleURI     string
 	BundleChecksum     string
 	BundleChecksumType string
-	LocalBundleURI     string
+	DeployStatus       string
+	DeployErrorCode    int
+	DeployErrorMessage string
 }
 
 type SQLExec interface {
@@ -37,10 +39,6 @@
 
 func InitDB(db apid.DB) error {
 	_, err := db.Exec(`
-	CREATE TABLE IF NOT EXISTS etag (
-		value integer
-	);
-	INSERT INTO etag VALUES (1);
 	CREATE TABLE IF NOT EXISTS deployments (
 		id character varying(36) NOT NULL,
 		bundle_config_id varchar(36) NOT NULL,
@@ -48,7 +46,6 @@
 		data_scope_id varchar(36) NOT NULL,
 		bundle_config_json text NOT NULL,
 		config_json text NOT NULL,
-		status text NOT NULL,
 		created timestamp without time zone,
 		created_by text,
 		updated timestamp without time zone,
@@ -56,6 +53,8 @@
 		bundle_name text,
 		bundle_uri text,
 		local_bundle_uri text,
+		bundle_checksum text,
+		bundle_checksum_type text,
 		deploy_status string,
 		deploy_error_code int,
 		deploy_error_message text,
@@ -77,49 +76,12 @@
 	return db
 }
 
+// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
 func SetDB(db apid.DB) {
-	dbMux.Lock()
 	if unsafeDB == nil { // init API when DB is initialized
 		go InitAPI()
 	}
 	unsafeDB = db
-	dbMux.Unlock()
-}
-
-// call whenever the list of deployments changes
-func incrementETag() error {
-
-	stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;")
-	if err != nil {
-		log.Errorf("prepare update etag failed: %v", err)
-		return err
-	}
-	defer stmt.Close()
-
-	_, err = stmt.Exec()
-	if err != nil {
-		log.Errorf("update etag failed: %v", err)
-		return err
-	}
-
-	log.Debugf("etag incremented")
-	return err
-}
-
-func getETag() (string, error) {
-
-	var eTag string
-	db := getDB()
-	row := db.QueryRow("SELECT value FROM etag")
-	err := row.Scan(&eTag)
-	//err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag)
-	if err != nil {
-		log.Errorf("select etag failed: %v", err)
-		return "", err
-	}
-
-	log.Debugf("etag queried: %v", eTag)
-	return eTag, err
 }
 
 func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
@@ -129,10 +91,11 @@
 	stmt, err := tx.Prepare(`
 	INSERT INTO deployments
 		(id, bundle_config_id, apid_cluster_id, data_scope_id,
-		bundle_config_json, config_json, status, created,
-		created_by, updated, updated_by, bundle_name,
-		bundle_uri, local_bundle_uri)
-		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
+		bundle_config_json, config_json, created, created_by,
+		updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+		bundle_checksum, bundle_checksum_type, deploy_status,
+		deploy_error_code, deploy_error_message)
+		VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
 	`)
 	if err != nil {
 		log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
@@ -142,9 +105,10 @@
 
 	_, err = stmt.Exec(
 		dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
-		dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
-		dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
-		dep.BundleURI, dep.LocalBundleURI)
+		dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+		dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
+		dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+		dep.DeployErrorCode, dep.DeployErrorMessage)
 	if err != nil {
 		log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
 		return err
@@ -179,35 +143,57 @@
 
 // getReadyDeployments() returns array of deployments that are ready to deploy
 func getReadyDeployments() (deployments []DataDeployment, err error) {
+	return getDeployments("WHERE local_bundle_uri != $1", "")
+}
 
+// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
+func getUnreadyDeployments() (deployments []DataDeployment, err error) {
+	return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "")
+}
+
+// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
+func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
 	db := getDB()
-	rows, err := db.Query(`
+
+	var stmt *sql.Stmt
+	stmt, err = db.Prepare(`
 	SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
-		bundle_config_json, config_json, status, created,
-		created_by, updated, updated_by, bundle_name,
-		bundle_uri, local_bundle_uri
+		bundle_config_json, config_json, created, created_by,
+		updated, updated_by, bundle_name, bundle_uri,
+		local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
+		deploy_error_code, deploy_error_message
 	FROM deployments
-	WHERE local_bundle_uri != ""
-	`)
+	` + where)
+	if err != nil {
+		return
+	}
+	var rows *sql.Rows
+	rows, err = stmt.Query(a...)
 	if err != nil {
 		if err == sql.ErrNoRows {
-			return deployments, nil
+			return
 		}
 		log.Errorf("Error querying deployments: %v", err)
 		return
 	}
 	defer rows.Close()
 
+	deployments = dataDeploymentsFromRows(rows)
+
+	return
+}
+
+func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
 	for rows.Next() {
 		dep := DataDeployment{}
 		rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
-			&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
-			&dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
-			&dep.BundleURI, &dep.LocalBundleURI,
+			&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
+			&dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
+			&dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
+			&dep.DeployErrorCode, &dep.DeployErrorMessage,
 		)
 		deployments = append(deployments, dep)
 	}
-
 	return
 }
 
@@ -252,18 +238,11 @@
 	return err
 }
 
-func updateLocalURI(depID, localBundleUri string) error {
+func updateLocalBundleURI(depID, localBundleUri string) error {
 
-	tx, err := getDB().Begin()
+	stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
 	if err != nil {
-		log.Errorf("begin updateLocalURI failed: %v", err)
-		return err
-	}
-	defer tx.Rollback()
-
-	stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
-	if err != nil {
-		log.Errorf("prepare updateLocalURI failed: %v", err)
+		log.Errorf("prepare updateLocalBundleURI failed: %v", err)
 		return err
 	}
 	defer stmt.Close()
@@ -274,13 +253,16 @@
 		return err
 	}
 
-	err = tx.Commit()
-	if err != nil {
-		log.Errorf("commit updateLocalURI failed: %v", err)
-		return err
-	}
-
 	log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
 
 	return nil
 }
+
+func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
+
+	err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
+	if err == sql.ErrNoRows {
+		err = nil
+	}
+	return
+}
diff --git a/init.go b/init.go
index 12f1339..49d4912 100644
--- a/init.go
+++ b/init.go
@@ -1,20 +1,34 @@
 package apiGatewayDeploy
 
 import (
+	"fmt"
 	"github.com/30x/apid"
+	"net/url"
 	"os"
 	"path"
+	"time"
 )
 
 const (
-	configBundleDirKey = "gatewaydeploy_bundle_dir"
+	configBundleDirKey          = "gatewaydeploy_bundle_dir"
+	configDebounceDuration      = "gatewaydeploy_debounce_duration"
+	configBundleCleanupDelay    = "gatewaydeploy_bundle_cleanup_delay"
+	configBundleDownloadTimeout = "gatewaydeploy_bundle_download_timeout"
+	configApiServerBaseURI      = "apigeesync_proxy_server_base"
+	configApidInstanceID        = "apigeesync_apid_instance_id"
+	configApidClusterID         = "apigeesync_cluster_id"
 )
 
 var (
-	services   apid.Services
-	log        apid.LogService
-	data       apid.DataService
-	bundlePath string
+	services           apid.Services
+	log                apid.LogService
+	data               apid.DataService
+	bundlePath         string
+	debounceDuration   time.Duration
+	bundleCleanupDelay time.Duration
+	apiServerBaseURI   *url.URL
+	apidInstanceID     string
+	apidClusterID      string
 )
 
 func init() {
@@ -27,7 +41,45 @@
 	log.Debug("start init")
 
 	config := services.Config()
+
+	if !config.IsSet(configApiServerBaseURI) {
+		return pluginData, fmt.Errorf("Missing required config value: %s", configApiServerBaseURI)
+	}
+	var err error
+	apiServerBaseURI, err = url.Parse(config.GetString(configApiServerBaseURI))
+	if err != nil {
+		return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
+	}
+
+	if !config.IsSet(configApidInstanceID) {
+		return pluginData, fmt.Errorf("Missing required config value: %s", configApidInstanceID)
+	}
+	apidInstanceID = config.GetString(configApidInstanceID)
+
+	if !config.IsSet(configApidClusterID) {
+		return pluginData, fmt.Errorf("Missing required config value: %s", configApidClusterID)
+	}
+	apidClusterID = config.GetString(configApidClusterID)
+
 	config.SetDefault(configBundleDirKey, "bundles")
+	config.SetDefault(configDebounceDuration, time.Second)
+	config.SetDefault(configBundleCleanupDelay, time.Minute)
+	config.SetDefault(configBundleDownloadTimeout, 5*time.Minute)
+
+	debounceDuration = config.GetDuration(configDebounceDuration)
+	if debounceDuration < time.Millisecond {
+		return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
+	}
+
+	bundleCleanupDelay = config.GetDuration(configBundleCleanupDelay)
+	if bundleCleanupDelay < time.Millisecond {
+		return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
+	}
+
+	bundleDownloadTimeout = config.GetDuration(configBundleDownloadTimeout)
+	if bundleDownloadTimeout < time.Millisecond {
+		return pluginData, fmt.Errorf("%s must be a positive duration", configBundleDownloadTimeout)
+	}
 
 	data = services.Data()
 
@@ -35,7 +87,7 @@
 	storagePath := config.GetString("local_storage_path")
 	bundlePath = path.Join(storagePath, relativeBundlePath)
 	if err := os.MkdirAll(bundlePath, 0700); err != nil {
-		log.Panicf("Failed bundle directory creation: %v", err)
+		return pluginData, fmt.Errorf("Failed bundle directory creation: %v", err)
 	}
 	log.Infof("Bundle directory path is %s", bundlePath)
 
diff --git a/listener.go b/listener.go
index 3b57542..24f3d96 100644
--- a/listener.go
+++ b/listener.go
@@ -5,6 +5,8 @@
 	"encoding/json"
 	"github.com/30x/apid"
 	"github.com/apigee-labs/transicator/common"
+	"os"
+	"time"
 )
 
 const (
@@ -60,17 +62,18 @@
 		log.Panicf("Error starting transaction: %v", err)
 	}
 
+	// ensure that no new database updates are made on old database
+	dbMux.Lock()
+	defer dbMux.Unlock()
+
 	defer tx.Rollback()
 	for _, table := range snapshot.Tables {
 		var err error
 		switch table.Name {
 		case DEPLOYMENT_TABLE:
 			log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
-			if len(table.Rows) == 0 {
-				return
-			}
 			for _, row := range table.Rows {
-				addDeployment(tx, row)
+				err = addDeployment(tx, row)
 			}
 		}
 		if err != nil {
@@ -84,6 +87,18 @@
 	}
 
 	SetDB(db)
+
+	// if no tables, this a startup event for an existing DB, start bundle downloads that didn't finish
+	if len(snapshot.Tables) == 0 {
+		deployments, err := getUnreadyDeployments()
+		if err != nil {
+			log.Panicf("unable to query database for unready deployments: %v", err)
+		}
+		for _, dep := range deployments {
+			go downloadBundle(dep)
+		}
+	}
+
 	log.Debug("Snapshot processed")
 }
 
@@ -94,6 +109,12 @@
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
 	defer tx.Rollback()
+
+	// ensure bundle download and delete updates aren't attempted while in process
+	dbMux.Lock()
+	defer dbMux.Unlock()
+
+	var bundlesToDelete []string
 	for _, change := range changes.Changes {
 		var err error
 		switch change.Table {
@@ -103,10 +124,11 @@
 				err = addDeployment(tx, change.NewRow)
 			case common.Delete:
 				var id string
-				err = change.OldRow.Get("id", &id)
+				change.OldRow.Get("id", &id)
+				localBundleUri, err := getLocalBundleURI(tx, id)
 				if err == nil {
+					bundlesToDelete = append(bundlesToDelete, localBundleUri)
 					err = deleteDeployment(tx, id)
-					// todo: delete downloaded bundle file
 				}
 			default:
 				log.Errorf("unexpected operation: %s", change.Operation)
@@ -120,58 +142,36 @@
 	if err != nil {
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
+
+	// clean up old bundles
+	if len(bundlesToDelete) > 0 {
+		log.Debugf("will delete %d old bundles", len(bundlesToDelete))
+		go func() {
+			// give clients a minute to avoid conflicts
+			time.Sleep(bundleCleanupDelay)
+			for _, b := range bundlesToDelete {
+				log.Debugf("removing old bundle: %v", b)
+				safeDelete(b)
+			}
+		}()
+	}
 }
 
-func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+func dataDeploymentFromRow(row common.Row) (d DataDeployment, err error) {
 
-	d := DataDeployment{}
-	err = row.Get("id", &d.ID)
-	if err != nil {
-		return
-	}
-	err = row.Get("bundle_config_id", &d.BundleConfigID)
-	if err != nil {
-		return
-	}
-	err = row.Get("apid_cluster_id", &d.ApidClusterID)
-	if err != nil {
-		return
-	}
-	err = row.Get("data_scope_id", &d.DataScopeID)
-	if err != nil {
-		return
-	}
-	err = row.Get("bundle_config_json", &d.BundleConfigJSON)
-	if err != nil {
-		return
-	}
-	err = row.Get("config_json", &d.ConfigJSON)
-	if err != nil {
-		return
-	}
-	err = row.Get("status", &d.Status)
-	if err != nil {
-		return
-	}
-	err = row.Get("created", &d.Created)
-	if err != nil {
-		return
-	}
-	err = row.Get("created_by", &d.CreatedBy)
-	if err != nil {
-		return
-	}
-	err = row.Get("updated", &d.Updated)
-	if err != nil {
-		return
-	}
-	err = row.Get("updated_by", &d.UpdatedBy)
-	if err != nil {
-		return
-	}
+	row.Get("id", &d.ID)
+	row.Get("bundle_config_id", &d.BundleConfigID)
+	row.Get("apid_cluster_id", &d.ApidClusterID)
+	row.Get("data_scope_id", &d.DataScopeID)
+	row.Get("bundle_config_json", &d.BundleConfigJSON)
+	row.Get("config_json", &d.ConfigJSON)
+	row.Get("created", &d.Created)
+	row.Get("created_by", &d.CreatedBy)
+	row.Get("updated", &d.Updated)
+	row.Get("updated_by", &d.UpdatedBy)
 
 	var bc bundleConfigJson
-	err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
+	json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
 	if err != nil {
 		log.Errorf("JSON decoding Manifest failed: %v", err)
 		return
@@ -182,6 +182,17 @@
 	d.BundleChecksumType = bc.ChecksumType
 	d.BundleChecksum = bc.Checksum
 
+	return
+}
+
+func addDeployment(tx *sql.Tx, row common.Row) (err error) {
+
+	var d DataDeployment
+	d, err = dataDeploymentFromRow(row)
+	if err != nil {
+		return
+	}
+
 	err = InsertDeployment(tx, d)
 	if err != nil {
 		return
@@ -191,3 +202,9 @@
 	go downloadBundle(d)
 	return
 }
+
+func safeDelete(file string) {
+	if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
+		log.Warnf("unable to delete file %s: %v", file, e)
+	}
+}
diff --git a/listener_test.go b/listener_test.go
index ec4b7f2..1b52f7f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -11,11 +11,6 @@
 
 var _ = Describe("listener", func() {
 
-	BeforeEach(func() {
-		_, err := getDB().Exec("DELETE FROM deployments")
-		Expect(err).ShouldNot(HaveOccurred())
-	})
-
 	Context("ApigeeSync snapshot event", func() {
 
 		It("should set DB and process", func(done Done) {