[ISSUE-66918282] add long-polling support in apid-core (#26)
* [ISSUE-66918282] add long-polling support in apid-core
* [ISSUE-66918282] add long-polling support in apid-core
* [ISSUE-66918282] update README
diff --git a/README.md b/README.md
index d920c97..6bf9a09 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,14 @@
// respond to request
}
+## Utils
+apid-core/util package offers common util functions for apid plugins:
+
+* Generate/Validate UUIDs
+* Long Polling
+* Debounce Events
+
+
## Running Tests
go test $(glide novendor)
diff --git a/util/util.go b/util/util.go
index ad10100..ad936ff 100644
--- a/util/util.go
+++ b/util/util.go
@@ -16,6 +16,8 @@
import (
"github.com/google/uuid"
+ "net/http"
+ "time"
)
func IsValidUUID(id string) bool {
@@ -26,3 +28,73 @@
func GenerateUUID() string {
return uuid.New().String()
}
+
+// distributeEvents() receives elements from deliverChan, and send them to subscribers
+// Sending a `chan interface{}` to addSubscriber adds a new subscriber.
+// It closes the subscriber channel after sending the element.
+// `go DistributeEvents(deliverChan, addSubscriber)` should be called during API initialization.
+// Any subscriber sent to `addSubscriber` should be buffered chan.
+func DistributeEvents(deliverChan <-chan interface{}, addSubscriber chan chan interface{}) {
+ subscribers := make([]chan interface{}, 0)
+ for {
+ select {
+ case element, ok := <-deliverChan:
+ if !ok {
+ return
+ }
+ for _, subscriber := range subscribers {
+ go func(sub chan interface{}) {
+ sub <- element
+ close(sub)
+ }(subscriber)
+ }
+ subscribers = make([]chan interface{}, 0)
+ case sub, ok := <-addSubscriber:
+ if !ok {
+ return
+ }
+ subscribers = append(subscribers, sub)
+ }
+ }
+}
+
+// LongPolling() subscribes to `addSubscriber`, and do long-polling until anything is delivered.
+// It calls `successHandler` if receives a notification.
+// It calls `timeoutHandler` if there's a timeout.
+// `go DistributeEvents(deliverChan, addSubscriber)` must have been called during API initialization.
+func LongPolling(w http.ResponseWriter, timeout time.Duration, addSubscriber chan chan interface{}, successHandler func(interface{}, http.ResponseWriter), timeoutHandler func(http.ResponseWriter)) {
+ notifyChan := make(chan interface{}, 1)
+ addSubscriber <- notifyChan
+ select {
+ case n := <-notifyChan:
+ successHandler(n, w)
+ case <-time.After(timeout):
+ timeoutHandler(w)
+ }
+}
+
+// Debounce() packs all elements received from channel `inChan` within the specified time window to one slice,
+// and send it to channel `outChan` periodically. If nothing is received in the time window, nothing will be sent to `outChan`.
+func Debounce(inChan chan interface{}, outChan chan []interface{}, window time.Duration) {
+ send := func(toSend []interface{}) {
+ if toSend != nil {
+ outChan <- toSend
+ }
+ }
+ var toSend []interface{} = nil
+ for {
+ select {
+ case incoming, ok := <-inChan:
+ if ok {
+ toSend = append(toSend, incoming)
+ } else {
+ send(toSend)
+ close(outChan)
+ return
+ }
+ case <-time.After(window):
+ send(toSend)
+ toSend = nil
+ }
+ }
+}
diff --git a/util/util_test.go b/util/util_test.go
index 3a1c5b4..54fd997 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -19,8 +19,12 @@
. "github.com/onsi/gomega"
"github.com/apid/apid-core/util"
+ "math/rand"
+ "net/http"
"regexp"
+ "sync/atomic"
"testing"
+ "time"
)
var _ = BeforeSuite(func() {
@@ -56,4 +60,106 @@
Ω(util.IsValidUUID(util.GenerateUUID())).Should(BeTrue())
})
})
+
+ Context("Long polling utils", func() {
+ It("DistributeEvents", func() {
+ // make test data
+ deliverChan := make(chan interface{})
+ addSubscriber := make(chan chan interface{})
+ subs := make([]chan interface{}, 50+rand.Intn(50))
+ for i := range subs {
+ subs[i] = make(chan interface{}, 1)
+ }
+
+ // test
+ go util.DistributeEvents(deliverChan, addSubscriber)
+
+ for i := range subs {
+ go func(j int) {
+ addSubscriber <- subs[j]
+ }(i)
+ }
+
+ n := rand.Int()
+ closed := new(int32)
+ go func(c *int32) {
+ for atomic.LoadInt32(c) == 0 {
+ deliverChan <- n
+ }
+ }(closed)
+
+ for i := range subs {
+ Ω(<-subs[i]).Should(Equal(n))
+ }
+ atomic.StoreInt32(closed, 1)
+ }, 2)
+
+ It("Long polling", func(done Done) {
+ // make test data
+ deliverChan := make(chan interface{})
+ addSubscriber := make(chan chan interface{})
+ go util.DistributeEvents(deliverChan, addSubscriber)
+ n := rand.Int()
+ closed := new(int32)
+ successHandler := func(e interface{}, w http.ResponseWriter) {
+ defer GinkgoRecover()
+ Ω(w).Should(BeNil())
+ Ω(e).Should(Equal(n))
+ atomic.StoreInt32(closed, 1)
+ close(done)
+ }
+ timeoutHandler := func(w http.ResponseWriter) {}
+
+ // Long polling
+ go util.LongPolling(nil, time.Minute, addSubscriber, successHandler, timeoutHandler)
+ go func(c *int32) {
+ for atomic.LoadInt32(c) == 0 {
+ deliverChan <- n
+ }
+ }(closed)
+ })
+
+ It("Long polling timeout", func(done Done) {
+ // make test data
+ deliverChan := make(chan interface{})
+ addSubscriber := make(chan chan interface{})
+ go util.DistributeEvents(deliverChan, addSubscriber)
+ successHandler := func(e interface{}, w http.ResponseWriter) {}
+ timeoutHandler := func(w http.ResponseWriter) {
+ defer GinkgoRecover()
+ Ω(w).Should(BeNil())
+ close(done)
+ }
+
+ // Long polling
+ go util.LongPolling(nil, time.Second, addSubscriber, successHandler, timeoutHandler)
+ }, 2)
+
+ It("Debounce", func() {
+ // make test data
+ data := make(map[int]int)
+ inChan := make(chan interface{})
+ outChan := make(chan []interface{})
+ go util.Debounce(inChan, outChan, time.Second)
+ for i := 0; i < 5+rand.Intn(5); i++ {
+ n := rand.Int()
+ data[n]++
+ go func(j int) {
+ inChan <- j
+ }(n)
+ }
+
+ // Debounce
+ e := <-outChan
+ for _, m := range e {
+ num, ok := m.(int)
+ Ω(ok).Should(BeTrue())
+ data[num]--
+ }
+
+ for _, v := range data {
+ Ω(v).Should(BeZero())
+ }
+ }, 2)
+ })
})