Add debouncing of deployment delivery w/ config
diff --git a/api.go b/api.go
index 49bf679..0904919 100644
--- a/api.go
+++ b/api.go
@@ -8,6 +8,7 @@
"net/http"
"strconv"
"time"
+ "sync"
)
const (
@@ -82,25 +83,44 @@
func distributeEvents() {
subscribers := make(map[chan string]struct{})
- for {
+ mut := sync.Mutex{}
+ msg := ""
+ debouncer := func() {
select {
- case msg := <-deploymentsChanged:
- // todo: add a debounce w/ timeout to avoid sending on every single deployment?
+ case <-time.After(debounceDuration):
+ mut.Lock()
subs := subscribers
- log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs))
subscribers = make(map[chan string]struct{})
+ m := msg
+ msg = ""
+ mut.Unlock()
+ log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
incrementETag()
for subscriber := range subs {
select {
- case subscriber <- msg:
- log.Debugf("Handling deploy response for: %s", msg)
+ case subscriber <- m:
+ log.Debugf("Handling deploy response for: %s", m)
default:
log.Debugf("listener too far behind, message dropped")
}
}
+ }
+ }
+ for {
+ select {
+ case newMsg := <-deploymentsChanged:
+ mut.Lock()
+ log.Debug("deploymentsChanged")
+ if msg == "" {
+ go debouncer()
+ }
+ msg = newMsg
+ mut.Unlock()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
+ mut.Lock()
subscribers[subscriber] = struct{}{}
+ mut.Unlock()
}
}
}
diff --git a/api_test.go b/api_test.go
index b371ea6..f267303 100644
--- a/api_test.go
+++ b/api_test.go
@@ -27,6 +27,17 @@
Expect(res.StatusCode).Should(Equal(http.StatusNotFound))
})
+ It("should debounce requests", func() {
+ var listener = make(chan string)
+ addSubscriber <- listener
+
+ deploymentsChanged <- "x"
+ deploymentsChanged <- "y"
+
+ id := <-listener
+ Expect(id).To(Equal("y"))
+ })
+
It("should get current deployments", func() {
deploymentID := "api_get_current"
@@ -97,7 +108,6 @@
defer res.Body.Close()
Expect(res.StatusCode).Should(Equal(http.StatusOK))
-
})
It("should get new deployment after blocking", func(done Done) {
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index a43fb41..803a8b3 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -40,6 +40,8 @@
Expect(err).NotTo(HaveOccurred())
SetDB(db)
+ debounceDuration = 1 * time.Millisecond
+
router := apid.API().Router()
// fake an unreliable bundle repo
backOffMultiplier = 10 * time.Millisecond
diff --git a/init.go b/init.go
index 12f1339..74a8548 100644
--- a/init.go
+++ b/init.go
@@ -4,10 +4,12 @@
"github.com/30x/apid"
"os"
"path"
+ "time"
)
const (
- configBundleDirKey = "gatewaydeploy_bundle_dir"
+ configBundleDirKey = "gatewaydeploy_bundle_dir"
+ configDebounceDuration = "gatewaydeploy_debounce_duration"
)
var (
@@ -15,6 +17,7 @@
log apid.LogService
data apid.DataService
bundlePath string
+ debounceDuration time.Duration
)
func init() {
@@ -28,6 +31,12 @@
config := services.Config()
config.SetDefault(configBundleDirKey, "bundles")
+ config.SetDefault(configDebounceDuration, 1 * time.Second)
+
+ debounceDuration = config.GetDuration(configDebounceDuration)
+ if debounceDuration < 1 * time.Millisecond {
+ log.Panicf("%s must be a positive duration", configDebounceDuration)
+ }
data = services.Data()