Incorporate review feedback, fix etag inclusion
diff --git a/api.go b/api.go
index e0f72a1..010e872 100644
--- a/api.go
+++ b/api.go
@@ -34,10 +34,11 @@
type deploymentsResult struct {
deployments []DataDeployment
err error
+ eTag string
}
var (
- deploymentsChanged = make(chan interface{})
+ deploymentsChanged = make(chan interface{}, 5)
addSubscriber = make(chan chan deploymentsResult)
removeSubscriber = make(chan chan deploymentsResult)
eTag int64
@@ -107,38 +108,29 @@
out <- toSend
}
}
+ var toSend []interface{}
for {
- incoming, ok := <-in
- if !ok {
- log.Debugf("closing debouncer")
- close(out)
- return
- }
- log.Debugf("debouncing %v", incoming)
- toSend := []interface{}{incoming}
- for {
- select {
- case incoming, ok := <-in:
- if ok {
- log.Debugf("debouncing %v", incoming)
- toSend = append(toSend, incoming)
- } else {
- send(toSend)
- log.Debugf("closing debouncer")
- close(out)
- return
- }
- case <-time.After(window):
+ select {
+ case incoming, ok := <-in:
+ if ok {
+ log.Debugf("debouncing %v", incoming)
+ toSend = append(toSend, incoming)
+ } else {
send(toSend)
- toSend = nil
+ log.Debugf("closing debouncer")
+ close(out)
+ return
}
+ case <-time.After(window):
+ send(toSend)
+ toSend = nil
}
}
}
func distributeEvents() {
subscribers := make(map[chan deploymentsResult]struct{})
- deliverDeployments := make(chan []interface{})
+ deliverDeployments := make(chan []interface{}, 1)
go debounce(deploymentsChanged, deliverDeployments, debounceDuration)
@@ -150,13 +142,13 @@
}
subs := subscribers
subscribers = make(map[chan deploymentsResult]struct{})
- incrementETag()
go func() {
+ eTag := incrementETag()
deployments, err := getReadyDeployments()
log.Debugf("delivering deployments to %d subscribers", len(subs))
for subscriber := range subs {
log.Debugf("delivering to: %v", subscriber)
- subscriber <- deploymentsResult{deployments, err}
+ subscriber <- deploymentsResult{deployments, err, eTag}
}
}()
case subscriber := <-addSubscriber:
@@ -204,7 +196,7 @@
// send results if different eTag
if eTag != ifNoneMatch {
- sendReadyDeployments(w, eTag)
+ sendReadyDeployments(w)
return
}
@@ -222,7 +214,7 @@
if result.err != nil {
writeDatabaseError(w)
} else {
- sendDeployments(w, result.deployments, eTag)
+ sendDeployments(w, result.deployments, result.eTag)
}
case <-time.After(time.Duration(timeout) * time.Second):
@@ -231,12 +223,13 @@
if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
- sendReadyDeployments(w, eTag)
+ sendReadyDeployments(w)
}
}
}
-func sendReadyDeployments(w http.ResponseWriter, eTag string) {
+func sendReadyDeployments(w http.ResponseWriter) {
+ eTag := getETag()
deployments, err := getReadyDeployments()
if err != nil {
writeDatabaseError(w)
@@ -380,8 +373,9 @@
}
// call whenever the list of deployments changes
-func incrementETag() {
- atomic.AddInt64(&eTag, 1)
+func incrementETag() string {
+ e := atomic.AddInt64(&eTag, 1)
+ return strconv.FormatInt(e, 10)
}
func getETag() string {
diff --git a/api_test.go b/api_test.go
index 30a28f4..e793251 100644
--- a/api_test.go
+++ b/api_test.go
@@ -109,6 +109,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
req, err := http.NewRequest("GET", uri.String(), nil)
req.Header.Add("Content-Type", "application/json")
@@ -152,6 +153,7 @@
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
eTag := res.Header.Get("etag")
+ Expect(eTag).ShouldNot(BeEmpty())
deploymentID = "api_get_current_blocking2"
go func() {
@@ -169,6 +171,9 @@
defer res.Body.Close()
Expect(res.StatusCode).To(Equal(http.StatusOK))
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
+ Expect(res.Header.Get("etag")).ShouldNot(Equal(eTag))
+
var depRes ApiDeploymentResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).ShouldNot(HaveOccurred())
@@ -199,6 +204,7 @@
res, err := http.Get(uri.String())
Expect(err).ShouldNot(HaveOccurred())
defer res.Body.Close()
+ Expect(res.Header.Get("etag")).ShouldNot(BeEmpty())
query := uri.Query()
query.Add("block", "1")
diff --git a/listener.go b/listener.go
index 8c403bf..ffa6593 100644
--- a/listener.go
+++ b/listener.go
@@ -222,8 +222,8 @@
log.Panicf("Error processing ChangeList: %v", err)
}
- if len(deploymentsToDelete) > 0 {
- deploymentsChanged <- deploymentsToDelete[0].ID // arbitrary, the ID doesn't matter
+ for _, d := range deploymentsToDelete {
+ deploymentsChanged <- d.ID
}
log.Debug("ChangeList processed")