Merge pull request #18 from 30x/rewrite-dispatcher

Rewrite debounce and dispatch
diff --git a/api.go b/api.go
index 1eced66..010e872 100644
--- a/api.go
+++ b/api.go
@@ -8,7 +8,6 @@
 	"net/http"
 	"net/url"
 	"strconv"
-	"sync"
 	"sync/atomic"
 	"time"
 )
@@ -32,9 +31,16 @@
 	API_ERR_INTERNAL
 )
 
+type deploymentsResult struct {
+	deployments []DataDeployment
+	err         error
+	eTag        string
+}
+
 var (
-	deploymentsChanged = make(chan string)
-	addSubscriber      = make(chan chan string)
+	deploymentsChanged = make(chan interface{}, 5)
+	addSubscriber      = make(chan chan deploymentsResult)
+	removeSubscriber   = make(chan chan deploymentsResult)
 	eTag               int64
 )
 
@@ -95,47 +101,62 @@
 	writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error")
 }
 
-func distributeEvents() {
-	subscribers := make(map[chan string]struct{})
-	mut := sync.Mutex{}
-	msg := ""
-	debouncer := func() {
-		select {
-		case <-time.After(debounceDuration):
-			mut.Lock()
-			subs := subscribers
-			subscribers = make(map[chan string]struct{})
-			m := msg
-			msg = ""
-			incrementETag()
-			mut.Unlock()
-			log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
-			for subscriber := range subs {
-				select {
-				case subscriber <- m:
-					log.Debugf("Handling deploy response for: %s", m)
-					log.Debugf("delivering TO: %v", subscriber)
-				default:
-					log.Debugf("listener too far behind, message dropped")
-				}
-			}
+func debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
+	send := func(toSend []interface{}) {
+		if toSend != nil {
+			log.Debugf("debouncer sending: %v", toSend)
+			out <- toSend
 		}
 	}
+	var toSend []interface{}
 	for {
 		select {
-		case newMsg := <-deploymentsChanged:
-			mut.Lock()
-			log.Debug("deploymentsChanged")
-			if msg == "" {
-				go debouncer()
+		case incoming, ok := <-in:
+			if ok {
+				log.Debugf("debouncing %v", incoming)
+				toSend = append(toSend, incoming)
+			} else {
+				send(toSend)
+				log.Debugf("closing debouncer")
+				close(out)
+				return
 			}
-			msg = newMsg
-			mut.Unlock()
+		case <-time.After(window):
+			send(toSend)
+			toSend = nil
+		}
+	}
+}
+
+func distributeEvents() {
+	subscribers := make(map[chan deploymentsResult]struct{})
+	deliverDeployments := make(chan []interface{}, 1)
+
+	go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
+
+	for {
+		select {
+		case _, ok := <-deliverDeployments:
+			if !ok {
+				return // todo: using this?
+			}
+			subs := subscribers
+			subscribers = make(map[chan deploymentsResult]struct{})
+			go func() {
+				eTag := incrementETag()
+				deployments, err := getReadyDeployments()
+				log.Debugf("delivering deployments to %d subscribers", len(subs))
+				for subscriber := range subs {
+					log.Debugf("delivering to: %v", subscriber)
+					subscriber <- deploymentsResult{deployments, err, eTag}
+				}
+			}()
 		case subscriber := <-addSubscriber:
 			log.Debugf("Add subscriber: %v", subscriber)
-			mut.Lock()
 			subscribers[subscriber] = struct{}{}
-			mut.Unlock()
+		case subscriber := <-removeSubscriber:
+			log.Debugf("Remove subscriber: %v", subscriber)
+			delete(subscribers, subscriber)
 		}
 	}
 }
@@ -173,41 +194,50 @@
 		return
 	}
 
-	// subscribe to new deployments in case we need it
-	var gotNewDeployment chan string
-	if timeout > 0 && ifNoneMatch != "" {
-		gotNewDeployment = make(chan string)
-		addSubscriber <- gotNewDeployment
-	}
-
-	deployments, err := getReadyDeployments()
-	if err != nil {
-		writeDatabaseError(w)
-		return
-	}
-
 	// send results if different eTag
 	if eTag != ifNoneMatch {
-		sendDeployments(w, deployments, eTag)
+		sendReadyDeployments(w)
 		return
 	}
 
+	// otherwise, subscribe to any new deployment changes
+	var newDeploymentsChannel chan deploymentsResult
+	if timeout > 0 && ifNoneMatch != "" {
+		newDeploymentsChannel = make(chan deploymentsResult, 1)
+		addSubscriber <- newDeploymentsChannel
+	}
+
 	log.Debug("Blocking request... Waiting for new Deployments.")
 
 	select {
-	case <-gotNewDeployment:
-		apiGetCurrentDeployments(w, r) // recurse
+	case result := <-newDeploymentsChannel:
+		if result.err != nil {
+			writeDatabaseError(w)
+		} else {
+			sendDeployments(w, result.deployments, result.eTag)
+		}
 
 	case <-time.After(time.Duration(timeout) * time.Second):
+		removeSubscriber <- newDeploymentsChannel
 		log.Debug("Blocking deployment request timed out.")
 		if ifNoneMatch != "" {
 			w.WriteHeader(http.StatusNotModified)
 		} else {
-			sendDeployments(w, deployments, eTag)
+			sendReadyDeployments(w)
 		}
 	}
 }
 
+func sendReadyDeployments(w http.ResponseWriter) {
+	eTag := getETag()
+	deployments, err := getReadyDeployments()
+	if err != nil {
+		writeDatabaseError(w)
+		return
+	}
+	sendDeployments(w, deployments, eTag)
+}
+
 func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
 
 	apiDeps := ApiDeploymentResponse{}
@@ -343,8 +373,9 @@
 }
 
 // call whenever the list of deployments changes
-func incrementETag() {
-	atomic.AddInt64(&eTag, 1)
+func incrementETag() string {
+	e := atomic.AddInt64(&eTag, 1)
+	return strconv.FormatInt(e, 10)
 }
 
 func getETag() string {
diff --git a/api_test.go b/api_test.go
index 373648d..e793251 100644
--- a/api_test.go
+++ b/api_test.go
@@ -37,15 +37,28 @@
 			Expect(string(body)).Should(Equal("[]"))
 		})
 
-		It("should debounce requests", func() {
-			var listener = make(chan string)
-			addSubscriber <- listener
+		It("should debounce requests", func(done Done) {
+			var in = make(chan interface{})
+			var out = make(chan []interface{})
 
-			deploymentsChanged <- "x"
-			deploymentsChanged <- "y"
+			go debounce(in, out, 3*time.Millisecond)
 
-			id := <-listener
-			Expect(id).To(Equal("y"))
+			go func() {
+				defer GinkgoRecover()
+
+				received, ok := <-out
+				Expect(ok).To(BeTrue())
+				Expect(len(received)).To(Equal(2))
+
+				close(in)
+				received, ok = <-out
+				Expect(ok).To(BeFalse())
+
+				close(done)
+			}()
+
+			in <- "x"
+			in <- "y"
 		})
 
 		It("should get current deployments", func() {
@@ -96,6 +109,7 @@
 			res, err := http.Get(uri.String())
 			Expect(err).ShouldNot(HaveOccurred())
 			defer res.Body.Close()
+			Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
 
 			req, err := http.NewRequest("GET", uri.String(), nil)
 			req.Header.Add("Content-Type", "application/json")
@@ -139,6 +153,7 @@
 			Expect(err).ShouldNot(HaveOccurred())
 			defer res.Body.Close()
 			eTag := res.Header.Get("etag")
+			Expect(eTag).ShouldNot(BeEmpty())
 
 			deploymentID = "api_get_current_blocking2"
 			go func() {
@@ -156,6 +171,9 @@
 				defer res.Body.Close()
 				Expect(res.StatusCode).To(Equal(http.StatusOK))
 
+				Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+				Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
+
 				var depRes ApiDeploymentResponse
 				body, err := ioutil.ReadAll(res.Body)
 				Expect(err).ShouldNot(HaveOccurred())
@@ -186,6 +204,7 @@
 			res, err := http.Get(uri.String())
 			Expect(err).ShouldNot(HaveOccurred())
 			defer res.Body.Close()
+			Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
 
 			query := uri.Query()
 			query.Add("block", "1")
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 74c8c62..227276d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -38,6 +38,7 @@
 	config.Set(configApidInstanceID, "INSTANCE_ID")
 	config.Set(configApidClusterID, "CLUSTER_ID")
 	config.Set(configApiServerBaseURI, "http://localhost")
+	config.Set(configDebounceDuration, "1ms")
 
 	apid.InitializePlugins()
 
@@ -47,7 +48,6 @@
 	Expect(err).NotTo(HaveOccurred())
 	SetDB(db)
 
-	debounceDuration = time.Millisecond
 	bundleCleanupDelay = time.Millisecond
 	bundleRetryDelay = 10 * time.Millisecond
 	markDeploymentFailedAfter = 50 * time.Millisecond
diff --git a/bundle_test.go b/bundle_test.go
index e2416f4..fd2be63 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -60,16 +60,13 @@
 
 			queueDownloadRequest(dep)
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
-			<-listener
+			result := <-listener
 
-			getReadyDeployments()
-			deployments, err := getReadyDeployments()
-			Expect(err).ShouldNot(HaveOccurred())
-
-			Expect(len(deployments)).To(Equal(1))
-			d := deployments[0]
+			Expect(result.err).NotTo(HaveOccurred())
+			Expect(len(result.deployments)).To(Equal(1))
+			d := result.deployments[0]
 			Expect(d.ID).To(Equal(deploymentID))
 		})
 
@@ -180,7 +177,7 @@
 
 			Expect(trackerHit).To(BeTrue())
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 			<-listener
 
diff --git a/listener.go b/listener.go
index 8c403bf..ffa6593 100644
--- a/listener.go
+++ b/listener.go
@@ -222,8 +222,8 @@
 		log.Panicf("Error processing ChangeList: %v", err)
 	}
 
-	if len(deploymentsToDelete) > 0 {
-		deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter
+	for _, d := range deploymentsToDelete {
+		deploymentsChanged <- d.ID
 	}
 
 	log.Debug("ChangeList processed")
diff --git a/listener_test.go b/listener_test.go
index 397c8cc..33ebd41 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -50,19 +50,28 @@
 				},
 			}
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 
 			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
 
-			id := <-listener
-			Expect(id).To(Equal(deploymentID))
+			result := <-listener
+			Expect(result.err).ToNot(HaveOccurred())
 
+			// from event
+			Expect(len(result.deployments)).To(Equal(1))
+			d := result.deployments[0]
+
+			Expect(d.ID).To(Equal(deploymentID))
+			Expect(d.BundleName).To(Equal(bundle1.Name))
+			Expect(d.BundleURI).To(Equal(bundle1.URI))
+
+			// from db
 			deployments, err := getReadyDeployments()
 			Expect(err).ShouldNot(HaveOccurred())
 
 			Expect(len(deployments)).To(Equal(1))
-			d := deployments[0]
+			d = deployments[0]
 
 			Expect(d.ID).To(Equal(deploymentID))
 			Expect(d.BundleName).To(Equal(bundle1.Name))
@@ -120,19 +129,16 @@
 			err = tx.Commit()
 			Expect(err).ShouldNot(HaveOccurred())
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 
 			apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
 
-			id := <-listener
-			Expect(id).To(Equal(deploymentID))
+			result := <-listener
+			Expect(result.err).ShouldNot(HaveOccurred())
 
-			deployments, err := getReadyDeployments()
-			Expect(err).ShouldNot(HaveOccurred())
-
-			Expect(len(deployments)).To(Equal(1))
-			d := deployments[0]
+			Expect(len(result.deployments)).To(Equal(1))
+			d := result.deployments[0]
 
 			Expect(d.ID).To(Equal(deploymentID))
 			close(done)
@@ -257,14 +263,14 @@
 				},
 			}
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 
 			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
 
 			// wait for event to propagate
-			id := <-listener
-			Expect(id).To(Equal(deploymentID))
+			result := <-listener
+			Expect(result.err).ShouldNot(HaveOccurred())
 
 			deployments, err := getReadyDeployments()
 			Expect(err).ShouldNot(HaveOccurred())
@@ -307,13 +313,12 @@
 				},
 			}
 
-			var listener = make(chan string)
+			var listener = make(chan deploymentsResult)
 			addSubscriber <- listener
 
 			apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
 
-			id := <-listener
-			Expect(id).To(Equal(deploymentID))
+			<-listener
 
 			deployments, err := getReadyDeployments()
 			Expect(err).ShouldNot(HaveOccurred())