Rewrite debounce and dispatch
diff --git a/api.go b/api.go index 1eced66..e0f72a1 100644 --- a/api.go +++ b/api.go
@@ -8,7 +8,6 @@ "net/http" "net/url" "strconv" - "sync" "sync/atomic" "time" ) @@ -32,9 +31,15 @@ API_ERR_INTERNAL ) +type deploymentsResult struct { + deployments []DataDeployment + err error +} + var ( - deploymentsChanged = make(chan string) - addSubscriber = make(chan chan string) + deploymentsChanged = make(chan interface{}) + addSubscriber = make(chan chan deploymentsResult) + removeSubscriber = make(chan chan deploymentsResult) eTag int64 ) @@ -95,47 +100,71 @@ writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, "database error") } -func distributeEvents() { - subscribers := make(map[chan string]struct{}) - mut := sync.Mutex{} - msg := "" - debouncer := func() { - select { - case <-time.After(debounceDuration): - mut.Lock() - subs := subscribers - subscribers = make(map[chan string]struct{}) - m := msg - msg = "" - incrementETag() - mut.Unlock() - log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs)) - for subscriber := range subs { - select { - case subscriber <- m: - log.Debugf("Handling deploy response for: %s", m) - log.Debugf("delivering TO: %v", subscriber) - default: - log.Debugf("listener too far behind, message dropped") - } - } +func debounce(in chan interface{}, out chan []interface{}, window time.Duration) { + send := func(toSend []interface{}) { + if toSend != nil { + log.Debugf("debouncer sending: %v", toSend) + out <- toSend } } for { - select { - case newMsg := <-deploymentsChanged: - mut.Lock() - log.Debug("deploymentsChanged") - if msg == "" { - go debouncer() + incoming, ok := <-in + if !ok { + log.Debugf("closing debouncer") + close(out) + return + } + log.Debugf("debouncing %v", incoming) + toSend := []interface{}{incoming} + for { + select { + case incoming, ok := <-in: + if ok { + log.Debugf("debouncing %v", incoming) + toSend = append(toSend, incoming) + } else { + send(toSend) + log.Debugf("closing debouncer") + close(out) + return + } + case <-time.After(window): + send(toSend) + toSend = nil } - msg = newMsg - mut.Unlock() + } + } +} + +func distributeEvents() { + subscribers := make(map[chan deploymentsResult]struct{}) + deliverDeployments := make(chan []interface{}) + + go debounce(deploymentsChanged, deliverDeployments, debounceDuration) + + for { + select { + case _, ok := <-deliverDeployments: + if !ok { + return // todo: using this? + } + subs := subscribers + subscribers = make(map[chan deploymentsResult]struct{}) + incrementETag() + go func() { + deployments, err := getReadyDeployments() + log.Debugf("delivering deployments to %d subscribers", len(subs)) + for subscriber := range subs { + log.Debugf("delivering to: %v", subscriber) + subscriber <- deploymentsResult{deployments, err} + } + }() case subscriber := <-addSubscriber: log.Debugf("Add subscriber: %v", subscriber) - mut.Lock() subscribers[subscriber] = struct{}{} - mut.Unlock() + case subscriber := <-removeSubscriber: + log.Debugf("Remove subscriber: %v", subscriber) + delete(subscribers, subscriber) } } } @@ -173,41 +202,49 @@ return } - // subscribe to new deployments in case we need it - var gotNewDeployment chan string - if timeout > 0 && ifNoneMatch != "" { - gotNewDeployment = make(chan string) - addSubscriber <- gotNewDeployment - } - - deployments, err := getReadyDeployments() - if err != nil { - writeDatabaseError(w) - return - } - // send results if different eTag if eTag != ifNoneMatch { - sendDeployments(w, deployments, eTag) + sendReadyDeployments(w, eTag) return } + // otherwise, subscribe to any new deployment changes + var newDeploymentsChannel chan deploymentsResult + if timeout > 0 && ifNoneMatch != "" { + newDeploymentsChannel = make(chan deploymentsResult, 1) + addSubscriber <- newDeploymentsChannel + } + log.Debug("Blocking request... Waiting for new Deployments.") select { - case <-gotNewDeployment: - apiGetCurrentDeployments(w, r) // recurse + case result := <-newDeploymentsChannel: + if result.err != nil { + writeDatabaseError(w) + } else { + sendDeployments(w, result.deployments, eTag) + } case <-time.After(time.Duration(timeout) * time.Second): + removeSubscriber <- newDeploymentsChannel log.Debug("Blocking deployment request timed out.") if ifNoneMatch != "" { w.WriteHeader(http.StatusNotModified) } else { - sendDeployments(w, deployments, eTag) + sendReadyDeployments(w, eTag) } } } +func sendReadyDeployments(w http.ResponseWriter, eTag string) { + deployments, err := getReadyDeployments() + if err != nil { + writeDatabaseError(w) + return + } + sendDeployments(w, deployments, eTag) +} + func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) { apiDeps := ApiDeploymentResponse{}
diff --git a/api_test.go b/api_test.go index 373648d..30a28f4 100644 --- a/api_test.go +++ b/api_test.go
@@ -37,15 +37,28 @@ Expect(string(body)).Should(Equal("[]")) }) - It("should debounce requests", func() { - var listener = make(chan string) - addSubscriber <- listener + It("should debounce requests", func(done Done) { + var in = make(chan interface{}) + var out = make(chan []interface{}) - deploymentsChanged <- "x" - deploymentsChanged <- "y" + go debounce(in, out, 3*time.Millisecond) - id := <-listener - Expect(id).To(Equal("y")) + go func() { + defer GinkgoRecover() + + received, ok := <-out + Expect(ok).To(BeTrue()) + Expect(len(received)).To(Equal(2)) + + close(in) + received, ok = <-out + Expect(ok).To(BeFalse()) + + close(done) + }() + + in <- "x" + in <- "y" }) It("should get current deployments", func() {
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 74c8c62..227276d 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -38,6 +38,7 @@ config.Set(configApidInstanceID, "INSTANCE_ID") config.Set(configApidClusterID, "CLUSTER_ID") config.Set(configApiServerBaseURI, "http://localhost") + config.Set(configDebounceDuration, "1ms") apid.InitializePlugins() @@ -47,7 +48,6 @@ Expect(err).NotTo(HaveOccurred()) SetDB(db) - debounceDuration = time.Millisecond bundleCleanupDelay = time.Millisecond bundleRetryDelay = 10 * time.Millisecond markDeploymentFailedAfter = 50 * time.Millisecond
diff --git a/bundle_test.go b/bundle_test.go index e2416f4..fd2be63 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -60,16 +60,13 @@ queueDownloadRequest(dep) - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener - <-listener + result := <-listener - getReadyDeployments() - deployments, err := getReadyDeployments() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d := deployments[0] + Expect(result.err).NotTo(HaveOccurred()) + Expect(len(result.deployments)).To(Equal(1)) + d := result.deployments[0] Expect(d.ID).To(Equal(deploymentID)) }) @@ -180,7 +177,7 @@ Expect(trackerHit).To(BeTrue()) - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener <-listener
diff --git a/listener_test.go b/listener_test.go index 397c8cc..33ebd41 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -50,19 +50,28 @@ }, } - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - id := <-listener - Expect(id).To(Equal(deploymentID)) + result := <-listener + Expect(result.err).ToNot(HaveOccurred()) + // from event + Expect(len(result.deployments)).To(Equal(1)) + d := result.deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.BundleName).To(Equal(bundle1.Name)) + Expect(d.BundleURI).To(Equal(bundle1.URI)) + + // from db deployments, err := getReadyDeployments() Expect(err).ShouldNot(HaveOccurred()) Expect(len(deployments)).To(Equal(1)) - d := deployments[0] + d = deployments[0] Expect(d.ID).To(Equal(deploymentID)) Expect(d.BundleName).To(Equal(bundle1.Name)) @@ -120,19 +129,16 @@ err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) - id := <-listener - Expect(id).To(Equal(deploymentID)) + result := <-listener + Expect(result.err).ShouldNot(HaveOccurred()) - deployments, err := getReadyDeployments() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d := deployments[0] + Expect(len(result.deployments)).To(Equal(1)) + d := result.deployments[0] Expect(d.ID).To(Equal(deploymentID)) close(done) @@ -257,14 +263,14 @@ }, } - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // wait for event to propagate - id := <-listener - Expect(id).To(Equal(deploymentID)) + result := <-listener + Expect(result.err).ShouldNot(HaveOccurred()) deployments, err := getReadyDeployments() Expect(err).ShouldNot(HaveOccurred()) @@ -307,13 +313,12 @@ }, } - var listener = make(chan string) + var listener = make(chan deploymentsResult) addSubscriber <- listener apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - id := <-listener - Expect(id).To(Equal(deploymentID)) + <-listener deployments, err := getReadyDeployments() Expect(err).ShouldNot(HaveOccurred())