Rewrite debounce and dispatch
diff --git a/api.go b/api.go
index 1eced66..e0f72a1 100644
--- a/api.go
+++ b/api.go
@@ -8,7 +8,6 @@
"net/http"
"net/url"
"strconv"
- "sync"
"sync/atomic"
"time"
)
@@ -32,9 +31,15 @@
API_ERR_INTERNAL
)
+type deploymentsResult struct {
+ deployments []DataDeployment
+ err error
+}
+
var (
- deploymentsChanged = make(chan string)
- addSubscriber = make(chan chan string)
+ deploymentsChanged = make(chan interface{})
+ addSubscriber = make(chan chan deploymentsResult)
+ removeSubscriber = make(chan chan deploymentsResult)
eTag int64
)
@@ -95,47 +100,71 @@
writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error")
}
-func distributeEvents() {
- subscribers := make(map[chan string]struct{})
- mut := sync.Mutex{}
- msg := ""
- debouncer := func() {
- select {
- case <-time.After(debounceDuration):
- mut.Lock()
- subs := subscribers
- subscribers = make(map[chan string]struct{})
- m := msg
- msg = ""
- incrementETag()
- mut.Unlock()
- log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
- for subscriber := range subs {
- select {
- case subscriber <- m:
- log.Debugf("Handling deploy response for: %s", m)
- log.Debugf("delivering TO: %v", subscriber)
- default:
- log.Debugf("listener too far behind, message dropped")
- }
- }
+func debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
+ send := func(toSend []interface{}) {
+ if toSend != nil {
+ log.Debugf("debouncer sending: %v", toSend)
+ out <- toSend
}
}
for {
- select {
- case newMsg := <-deploymentsChanged:
- mut.Lock()
- log.Debug("deploymentsChanged")
- if msg == "" {
- go debouncer()
+ incoming, ok := <-in
+ if !ok {
+ log.Debugf("closing debouncer")
+ close(out)
+ return
+ }
+ log.Debugf("debouncing %v", incoming)
+ toSend := []interface{}{incoming}
+ for {
+ select {
+ case incoming, ok := <-in:
+ if ok {
+ log.Debugf("debouncing %v", incoming)
+ toSend = append(toSend, incoming)
+ } else {
+ send(toSend)
+ log.Debugf("closing debouncer")
+ close(out)
+ return
+ }
+ case <-time.After(window):
+ send(toSend)
+ toSend = nil
}
- msg = newMsg
- mut.Unlock()
+ }
+ }
+}
+
+func distributeEvents() {
+ subscribers := make(map[chan deploymentsResult]struct{})
+ deliverDeployments := make(chan []interface{})
+
+ go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
+
+ for {
+ select {
+ case _, ok := <-deliverDeployments:
+ if !ok {
+ return // todo: using this?
+ }
+ subs := subscribers
+ subscribers = make(map[chan deploymentsResult]struct{})
+ incrementETag()
+ go func() {
+ deployments, err := getReadyDeployments()
+ log.Debugf("delivering deployments to %d subscribers", len(subs))
+ for subscriber := range subs {
+ log.Debugf("delivering to: %v", subscriber)
+ subscriber <- deploymentsResult{deployments, err}
+ }
+ }()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
- mut.Lock()
subscribers[subscriber] = struct{}{}
- mut.Unlock()
+ case subscriber := <-removeSubscriber:
+ log.Debugf("Remove subscriber: %v", subscriber)
+ delete(subscribers, subscriber)
}
}
}
@@ -173,41 +202,49 @@
return
}
- // subscribe to new deployments in case we need it
- var gotNewDeployment chan string
- if timeout > 0 && ifNoneMatch != "" {
- gotNewDeployment = make(chan string)
- addSubscriber <- gotNewDeployment
- }
-
- deployments, err := getReadyDeployments()
- if err != nil {
- writeDatabaseError(w)
- return
- }
-
// send results if different eTag
if eTag != ifNoneMatch {
- sendDeployments(w, deployments, eTag)
+ sendReadyDeployments(w, eTag)
return
}
+ // otherwise, subscribe to any new deployment changes
+ var newDeploymentsChannel chan deploymentsResult
+ if timeout > 0 && ifNoneMatch != "" {
+ newDeploymentsChannel = make(chan deploymentsResult, 1)
+ addSubscriber <- newDeploymentsChannel
+ }
+
log.Debug("Blocking request... Waiting for new Deployments.")
select {
- case <-gotNewDeployment:
- apiGetCurrentDeployments(w, r) // recurse
+ case result := <-newDeploymentsChannel:
+ if result.err != nil {
+ writeDatabaseError(w)
+ } else {
+ sendDeployments(w, result.deployments, eTag)
+ }
case <-time.After(time.Duration(timeout) * time.Second):
+ removeSubscriber <- newDeploymentsChannel
log.Debug("Blocking deployment request timed out.")
if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
- sendDeployments(w, deployments, eTag)
+ sendReadyDeployments(w, eTag)
}
}
}
+func sendReadyDeployments(w http.ResponseWriter, eTag string) {
+ deployments, err := getReadyDeployments()
+ if err != nil {
+ writeDatabaseError(w)
+ return
+ }
+ sendDeployments(w, deployments, eTag)
+}
+
func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
apiDeps := ApiDeploymentResponse{}
diff --git a/api_test.go b/api_test.go
index 373648d..30a28f4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -37,15 +37,28 @@
Expect(string(body)).Should(Equal("[]"))
})
- It("should debounce requests", func() {
- var listener = make(chan string)
- addSubscriber <- listener
+ It("should debounce requests", func(done Done) {
+ var in = make(chan interface{})
+ var out = make(chan []interface{})
- deploymentsChanged <- "x"
- deploymentsChanged <- "y"
+ go debounce(in, out, 3*time.Millisecond)
- id := <-listener
- Expect(id).To(Equal("y"))
+ go func() {
+ defer GinkgoRecover()
+
+ received, ok := <-out
+ Expect(ok).To(BeTrue())
+ Expect(len(received)).To(Equal(2))
+
+ close(in)
+ received, ok = <-out
+ Expect(ok).To(BeFalse())
+
+ close(done)
+ }()
+
+ in <- "x"
+ in <- "y"
})
It("should get current deployments", func() {
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 74c8c62..227276d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -38,6 +38,7 @@
config.Set(configApidInstanceID, "INSTANCE_ID")
config.Set(configApidClusterID, "CLUSTER_ID")
config.Set(configApiServerBaseURI, "http://localhost")
+ config.Set(configDebounceDuration, "1ms")
apid.InitializePlugins()
@@ -47,7 +48,6 @@
Expect(err).NotTo(HaveOccurred())
SetDB(db)
- debounceDuration = time.Millisecond
bundleCleanupDelay = time.Millisecond
bundleRetryDelay = 10 * time.Millisecond
markDeploymentFailedAfter = 50 * time.Millisecond
diff --git a/bundle_test.go b/bundle_test.go
index e2416f4..fd2be63 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -60,16 +60,13 @@
queueDownloadRequest(dep)
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
- <-listener
+ result := <-listener
- getReadyDeployments()
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ Expect(result.err).NotTo(HaveOccurred())
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
})
@@ -180,7 +177,7 @@
Expect(trackerHit).To(BeTrue())
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
<-listener
diff --git a/listener_test.go b/listener_test.go
index 397c8cc..33ebd41 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -50,19 +50,28 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ToNot(HaveOccurred())
+ // from event
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle1.Name))
+ Expect(d.BundleURI).To(Equal(bundle1.URI))
+
+ // from db
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())
Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ d = deployments[0]
Expect(d.ID).To(Equal(deploymentID))
Expect(d.BundleName).To(Equal(bundle1.Name))
@@ -120,19 +129,16 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
close(done)
@@ -257,14 +263,14 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
// wait for event to propagate
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())
@@ -307,13 +313,12 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ <-listener
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())