Add debouncing of deployment delivery w/ config
diff --git a/api.go b/api.go index 49bf679..0904919 100644 --- a/api.go +++ b/api.go
@@ -8,6 +8,7 @@ "net/http" "strconv" "time" + "sync" ) const ( @@ -82,25 +83,44 @@ func distributeEvents() { subscribers := make(map[chan string]struct{}) - for { + mut := sync.Mutex{} + msg := "" + debouncer := func() { select { - case msg := <-deploymentsChanged: - // todo: add a debounce w/ timeout to avoid sending on every single deployment? + case <-time.After(debounceDuration): + mut.Lock() subs := subscribers - log.Debugf("Delivering deployment change %s to %d subscribers", msg, len(subs)) subscribers = make(map[chan string]struct{}) + m := msg + msg = "" + mut.Unlock() + log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs)) incrementETag() for subscriber := range subs { select { - case subscriber <- msg: - log.Debugf("Handling deploy response for: %s", msg) + case subscriber <- m: + log.Debugf("Handling deploy response for: %s", m) default: log.Debugf("listener too far behind, message dropped") } } + } + } + for { + select { + case newMsg := <-deploymentsChanged: + mut.Lock() + log.Debug("deploymentsChanged") + if msg == "" { + go debouncer() + } + msg = newMsg + mut.Unlock() case subscriber := <-addSubscriber: log.Debugf("Add subscriber: %v", subscriber) + mut.Lock() subscribers[subscriber] = struct{}{} + mut.Unlock() } } }
diff --git a/api_test.go b/api_test.go index b371ea6..f267303 100644 --- a/api_test.go +++ b/api_test.go
@@ -27,6 +27,17 @@ Expect(res.StatusCode).Should(Equal(http.StatusNotFound)) }) + It("should debounce requests", func() { + var listener = make(chan string) + addSubscriber <- listener + + deploymentsChanged <- "x" + deploymentsChanged <- "y" + + id := <-listener + Expect(id).To(Equal("y")) + }) + It("should get current deployments", func() { deploymentID := "api_get_current" @@ -97,7 +108,6 @@ defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) - }) It("should get new deployment after blocking", func(done Done) {
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index a43fb41..803a8b3 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -40,6 +40,8 @@ Expect(err).NotTo(HaveOccurred()) SetDB(db) + debounceDuration = 1 * time.Millisecond + router := apid.API().Router() // fake an unreliable bundle repo backOffMultiplier = 10 * time.Millisecond
diff --git a/init.go b/init.go index 12f1339..74a8548 100644 --- a/init.go +++ b/init.go
@@ -4,10 +4,12 @@ "github.com/30x/apid" "os" "path" + "time" ) const ( - configBundleDirKey = "gatewaydeploy_bundle_dir" + configBundleDirKey = "gatewaydeploy_bundle_dir" + configDebounceDuration = "gatewaydeploy_debounce_duration" ) var ( @@ -15,6 +17,7 @@ log apid.LogService data apid.DataService bundlePath string + debounceDuration time.Duration ) func init() { @@ -28,6 +31,12 @@ config := services.Config() config.SetDefault(configBundleDirKey, "bundles") + config.SetDefault(configDebounceDuration, 1 * time.Second) + + debounceDuration = config.GetDuration(configDebounceDuration) + if debounceDuration < 1 * time.Millisecond { + log.Panicf("%s must be a positive duration", configDebounceDuration) + } data = services.Data()