Merge pull request #18 from 30x/rewrite-dispatcher
Rewrite debounce and dispatch
diff --git a/api.go b/api.go
index 1eced66..010e872 100644
--- a/api.go
+++ b/api.go
@@ -8,7 +8,6 @@
"net/http"
"net/url"
"strconv"
- "sync"
"sync/atomic"
"time"
)
@@ -32,9 +31,16 @@
API_ERR_INTERNAL
)
+type deploymentsResult struct {
+ deployments []DataDeployment
+ err error
+ eTag string
+}
+
var (
- deploymentsChanged = make(chan string)
- addSubscriber = make(chan chan string)
+ deploymentsChanged = make(chan interface{}, 5)
+ addSubscriber = make(chan chan deploymentsResult)
+ removeSubscriber = make(chan chan deploymentsResult)
eTag int64
)
@@ -95,47 +101,62 @@
writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error")
}
-func distributeEvents() {
- subscribers := make(map[chan string]struct{})
- mut := sync.Mutex{}
- msg := ""
- debouncer := func() {
- select {
- case <-time.After(debounceDuration):
- mut.Lock()
- subs := subscribers
- subscribers = make(map[chan string]struct{})
- m := msg
- msg = ""
- incrementETag()
- mut.Unlock()
- log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
- for subscriber := range subs {
- select {
- case subscriber <- m:
- log.Debugf("Handling deploy response for: %s", m)
- log.Debugf("delivering TO: %v", subscriber)
- default:
- log.Debugf("listener too far behind, message dropped")
- }
- }
+func debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
+ send := func(toSend []interface{}) {
+ if toSend != nil {
+ log.Debugf("debouncer sending: %v", toSend)
+ out <- toSend
}
}
+ var toSend []interface{}
for {
select {
- case newMsg := <-deploymentsChanged:
- mut.Lock()
- log.Debug("deploymentsChanged")
- if msg == "" {
- go debouncer()
+ case incoming, ok := <-in:
+ if ok {
+ log.Debugf("debouncing %v", incoming)
+ toSend = append(toSend, incoming)
+ } else {
+ send(toSend)
+ log.Debugf("closing debouncer")
+ close(out)
+ return
}
- msg = newMsg
- mut.Unlock()
+ case <-time.After(window):
+ send(toSend)
+ toSend = nil
+ }
+ }
+}
+
+func distributeEvents() {
+ subscribers := make(map[chan deploymentsResult]struct{})
+ deliverDeployments := make(chan []interface{}, 1)
+
+ go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
+
+ for {
+ select {
+ case _, ok := <-deliverDeployments:
+ if !ok {
+ return // todo: using this?
+ }
+ subs := subscribers
+ subscribers = make(map[chan deploymentsResult]struct{})
+ go func() {
+ eTag := incrementETag()
+ deployments, err := getReadyDeployments()
+ log.Debugf("delivering deployments to %d subscribers", len(subs))
+ for subscriber := range subs {
+ log.Debugf("delivering to: %v", subscriber)
+ subscriber <- deploymentsResult{deployments, err, eTag}
+ }
+ }()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
- mut.Lock()
subscribers[subscriber] = struct{}{}
- mut.Unlock()
+ case subscriber := <-removeSubscriber:
+ log.Debugf("Remove subscriber: %v", subscriber)
+ delete(subscribers, subscriber)
}
}
}
@@ -173,41 +194,50 @@
return
}
- // subscribe to new deployments in case we need it
- var gotNewDeployment chan string
- if timeout > 0 && ifNoneMatch != "" {
- gotNewDeployment = make(chan string)
- addSubscriber <- gotNewDeployment
- }
-
- deployments, err := getReadyDeployments()
- if err != nil {
- writeDatabaseError(w)
- return
- }
-
// send results if different eTag
if eTag != ifNoneMatch {
- sendDeployments(w, deployments, eTag)
+ sendReadyDeployments(w)
return
}
+ // otherwise, subscribe to any new deployment changes
+ var newDeploymentsChannel chan deploymentsResult
+ if timeout > 0 && ifNoneMatch != "" {
+ newDeploymentsChannel = make(chan deploymentsResult, 1)
+ addSubscriber <- newDeploymentsChannel
+ }
+
log.Debug("Blocking request... Waiting for new Deployments.")
select {
- case <-gotNewDeployment:
- apiGetCurrentDeployments(w, r) // recurse
+ case result := <-newDeploymentsChannel:
+ if result.err != nil {
+ writeDatabaseError(w)
+ } else {
+ sendDeployments(w, result.deployments, result.eTag)
+ }
case <-time.After(time.Duration(timeout) * time.Second):
+ removeSubscriber <- newDeploymentsChannel
log.Debug("Blocking deployment request timed out.")
if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
- sendDeployments(w, deployments, eTag)
+ sendReadyDeployments(w)
}
}
}
+func sendReadyDeployments(w http.ResponseWriter) {
+ eTag := getETag()
+ deployments, err := getReadyDeployments()
+ if err != nil {
+ writeDatabaseError(w)
+ return
+ }
+ sendDeployments(w, deployments, eTag)
+}
+
func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
apiDeps := ApiDeploymentResponse{}
@@ -343,8 +373,9 @@
}
// call whenever the list of deployments changes
-func incrementETag() {
- atomic.AddInt64(&eTag, 1)
+func incrementETag() string {
+ e := atomic.AddInt64(&eTag, 1)
+ return strconv.FormatInt(e, 10)
}
func getETag() string {
diff --git a/api_test.go b/api_test.go
index 373648d..e793251 100644
--- a/api_test.go
+++ b/api_test.go
@@ -37,15 +37,28 @@
Expect(string(body)).Should(Equal("[]"))
})
- It("should debounce requests", func() {
- var listener = make(chan string)
- addSubscriber <- listener
+ It("should debounce requests", func(done Done) {
+ var in = make(chan interface{})
+ var out = make(chan []interface{})
- deploymentsChanged <- "x"
- deploymentsChanged <- "y"
+ go debounce(in, out, 3*time.Millisecond)
- id := <-listener
- Expect(id).To(Equal("y"))
+ go func() {
+ defer GinkgoRecover()
+
+ received, ok := <-out
+ Expect(ok).To(BeTrue())
+ Expect(len(received)).To(Equal(2))
+
+ close(in)
+ received, ok = <-out
+ Expect(ok).To(BeFalse())
+
+ close(done)
+ }()
+
+ in <- "x"
+ in <- "y"
})
It("should get current deployments", func() {
@@ -96,6 +109,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
req, err := http.NewRequest("GET", uri.String(), nil)
req.Header.Add("Content-Type", "application/json")
@@ -139,6 +153,7 @@
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
eTag := res.Header.Get("etag")
+ Expect(eTag).ShouldNot(BeEmpty())
deploymentID = "api_get_current_blocking2"
go func() {
@@ -156,6 +171,9 @@
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusOK))
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+ Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
+
var depRes ApiDeploymentResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).ShouldNot(HaveOccurred())
@@ -186,6 +204,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
query := uri.Query()
query.Add("block", "1")
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 74c8c62..227276d 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -38,6 +38,7 @@
config.Set(configApidInstanceID, "INSTANCE_ID")
config.Set(configApidClusterID, "CLUSTER_ID")
config.Set(configApiServerBaseURI, "http://localhost")
+ config.Set(configDebounceDuration, "1ms")
apid.InitializePlugins()
@@ -47,7 +48,6 @@
Expect(err).NotTo(HaveOccurred())
SetDB(db)
- debounceDuration = time.Millisecond
bundleCleanupDelay = time.Millisecond
bundleRetryDelay = 10 * time.Millisecond
markDeploymentFailedAfter = 50 * time.Millisecond
diff --git a/bundle_test.go b/bundle_test.go
index e2416f4..fd2be63 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -60,16 +60,13 @@
queueDownloadRequest(dep)
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
- <-listener
+ result := <-listener
- getReadyDeployments()
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ Expect(result.err).NotTo(HaveOccurred())
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
})
@@ -180,7 +177,7 @@
Expect(trackerHit).To(BeTrue())
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
<-listener
diff --git a/listener.go b/listener.go
index 8c403bf..ffa6593 100644
--- a/listener.go
+++ b/listener.go
@@ -222,8 +222,8 @@
log.Panicf("Error processing ChangeList: %v", err)
}
- if len(deploymentsToDelete) > 0 {
- deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter
+ for _, d := range deploymentsToDelete {
+ deploymentsChanged <- d.ID
}
log.Debug("ChangeList processed")
diff --git a/listener_test.go b/listener_test.go
index 397c8cc..33ebd41 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -50,19 +50,28 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ToNot(HaveOccurred())
+ // from event
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle1.Name))
+ Expect(d.BundleURI).To(Equal(bundle1.URI))
+
+ // from db
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())
Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ d = deployments[0]
Expect(d.ID).To(Equal(deploymentID))
Expect(d.BundleName).To(Equal(bundle1.Name))
@@ -120,19 +129,16 @@
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d := deployments[0]
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
close(done)
@@ -257,14 +263,14 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
// wait for event to propagate
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())
@@ -307,13 +313,12 @@
},
}
- var listener = make(chan string)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- id := <-listener
- Expect(id).To(Equal(deploymentID))
+ <-listener
deployments, err := getReadyDeployments()
Expect(err).ShouldNot(HaveOccurred())