Incorporate review feedback, fix etag inclusion
diff --git a/api.go b/api.go index e0f72a1..010e872 100644 --- a/api.go +++ b/api.go
@@ -34,10 +34,11 @@ type deploymentsResult struct { deployments []DataDeployment err error + eTag string } var ( - deploymentsChanged = make(chan interface{}) + deploymentsChanged = make(chan interface{}, 5) addSubscriber = make(chan chan deploymentsResult) removeSubscriber = make(chan chan deploymentsResult) eTag int64 @@ -107,38 +108,29 @@ out <- toSend } } + var toSend []interface{} for { - incoming, ok := <-in - if !ok { - log.Debugf("closing debouncer") - close(out) - return - } - log.Debugf("debouncing %v", incoming) - toSend := []interface{}{incoming} - for { - select { - case incoming, ok := <-in: - if ok { - log.Debugf("debouncing %v", incoming) - toSend = append(toSend, incoming) - } else { - send(toSend) - log.Debugf("closing debouncer") - close(out) - return - } - case <-time.After(window): + select { + case incoming, ok := <-in: + if ok { + log.Debugf("debouncing %v", incoming) + toSend = append(toSend, incoming) + } else { send(toSend) - toSend = nil + log.Debugf("closing debouncer") + close(out) + return } + case <-time.After(window): + send(toSend) + toSend = nil } } } func distributeEvents() { subscribers := make(map[chan deploymentsResult]struct{}) - deliverDeployments := make(chan []interface{}) + deliverDeployments := make(chan []interface{}, 1) go debounce(deploymentsChanged, deliverDeployments, debounceDuration) @@ -150,13 +142,13 @@ } subs := subscribers subscribers = make(map[chan deploymentsResult]struct{}) - incrementETag() go func() { + eTag := incrementETag() deployments, err := getReadyDeployments() log.Debugf("delivering deployments to %d subscribers", len(subs)) for subscriber := range subs { log.Debugf("delivering to: %v", subscriber) - subscriber <- deploymentsResult{deployments, err} + subscriber <- deploymentsResult{deployments, err, eTag} } }() case subscriber := <-addSubscriber: @@ -204,7 +196,7 @@ // send results if different eTag if eTag != ifNoneMatch { - sendReadyDeployments(w, eTag) + sendReadyDeployments(w) return } @@ -222,7 +214,7 @@ if result.err != nil { writeDatabaseError(w) } else { - sendDeployments(w, result.deployments, eTag) + sendDeployments(w, result.deployments, result.eTag) } case <-time.After(time.Duration(timeout) * time.Second): @@ -231,12 +223,13 @@ if ifNoneMatch != "" { w.WriteHeader(http.StatusNotModified) } else { - sendReadyDeployments(w, eTag) + sendReadyDeployments(w) } } } -func sendReadyDeployments(w http.ResponseWriter, eTag string) { +func sendReadyDeployments(w http.ResponseWriter) { + eTag := getETag() deployments, err := getReadyDeployments() if err != nil { writeDatabaseError(w) @@ -380,8 +373,9 @@ } // call whenever the list of deployments changes -func incrementETag() { - atomic.AddInt64(&eTag, 1) +func incrementETag() string { + e := atomic.AddInt64(&eTag, 1) + return strconv.FormatInt(e, 10) } func getETag() string {
diff --git a/api_test.go b/api_test.go index 30a28f4..e793251 100644 --- a/api_test.go +++ b/api_test.go
@@ -109,6 +109,7 @@ res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() + Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) req, err := http.NewRequest("GET", uri.String(), nil) req.Header.Add("Content-Type", "application/json") @@ -152,6 +153,7 @@ Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() eTag := res.Header.Get("etag") + Expect(eTag).ShouldNot(BeEmpty()) deploymentID = "api_get_current_blocking2" go func() { @@ -169,6 +171,9 @@ defer res.Body.Close() Expect(res.StatusCode).To(Equal(http.StatusOK)) + Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) + Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag)) + var depRes ApiDeploymentResponse body, err := ioutil.ReadAll(res.Body) Expect(err).ShouldNot(HaveOccurred()) @@ -199,6 +204,7 @@ res, err := http.Get(uri.String()) Expect(err).ShouldNot(HaveOccurred()) defer res.Body.Close() + Expect(res.Header.Get("etag")).ShouldNot(BeEmpty()) query := uri.Query() query.Add("block", "1")
diff --git a/listener.go b/listener.go index 8c403bf..ffa6593 100644 --- a/listener.go +++ b/listener.go
@@ -222,8 +222,8 @@ log.Panicf("Error processing ChangeList: %v", err) } - if len(deploymentsToDelete) > 0 { - deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter + for _, d := range deploymentsToDelete { + deploymentsChanged <- d.ID } log.Debug("ChangeList processed")