Merge branch 'master' of github.com:apid/apid-core into bug67694505
diff --git a/README.md b/README.md index d920c97..6bf9a09 100644 --- a/README.md +++ b/README.md
@@ -49,6 +49,14 @@ // respond to request } +## Utils +apid-core/util package offers common util functions for apid plugins: + +* Generate/Validate UUIDs +* Long Polling +* Debounce Events + + ## Running Tests go test $(glide novendor)
diff --git a/util/.util.go.swp b/util/.util.go.swp new file mode 100644 index 0000000..da82acd --- /dev/null +++ b/util/.util.go.swp Binary files differ
diff --git a/util/util.go b/util/util.go index 804d5bd..0ae4108 100644 --- a/util/util.go +++ b/util/util.go
@@ -19,6 +19,7 @@ "github.com/google/uuid" "net/http" "net/url" + "time" "github.com/apid/apid-core" ) @@ -42,6 +43,7 @@ return uuid.New().String() } +// Returns the http.Transport with Forward Proxy params set (if Configured). func Transport() *http.Transport { var tr http.Transport var pURL *url.URL @@ -76,3 +78,72 @@ return &tr } +// distributeEvents() receives elements from deliverChan, and send them to subscribers +// Sending a `chan interface{}` to addSubscriber adds a new subscriber. +// It closes the subscriber channel after sending the element. +// `go DistributeEvents(deliverChan, addSubscriber)` should be called during API initialization. +// Any subscriber sent to `addSubscriber` should be buffered chan. +func DistributeEvents(deliverChan <-chan interface{}, addSubscriber chan chan interface{}) { + subscribers := make([]chan interface{}, 0) + for { + select { + case element, ok := <-deliverChan: + if !ok { + return + } + for _, subscriber := range subscribers { + go func(sub chan interface{}) { + sub <- element + close(sub) + }(subscriber) + } + subscribers = make([]chan interface{}, 0) + case sub, ok := <-addSubscriber: + if !ok { + return + } + subscribers = append(subscribers, sub) + } + } +} + +// LongPolling() subscribes to `addSubscriber`, and do long-polling until anything is delivered. +// It calls `successHandler` if receives a notification. +// It calls `timeoutHandler` if there's a timeout. +// `go DistributeEvents(deliverChan, addSubscriber)` must have been called during API initialization. +func LongPolling(w http.ResponseWriter, timeout time.Duration, addSubscriber chan chan interface{}, successHandler func(interface{}, http.ResponseWriter), timeoutHandler func(http.ResponseWriter)) { + notifyChan := make(chan interface{}, 1) + addSubscriber <- notifyChan + select { + case n := <-notifyChan: + successHandler(n, w) + case <-time.After(timeout): + timeoutHandler(w) + } +} + +// Debounce() packs all elements received from channel `inChan` within the specified time window to one slice, +// and send it to channel `outChan` periodically. If nothing is received in the time window, nothing will be sent to `outChan`. +func Debounce(inChan chan interface{}, outChan chan []interface{}, window time.Duration) { + send := func(toSend []interface{}) { + if toSend != nil { + outChan <- toSend + } + } + var toSend []interface{} = nil + for { + select { + case incoming, ok := <-inChan: + if ok { + toSend = append(toSend, incoming) + } else { + send(toSend) + close(outChan) + return + } + case <-time.After(window): + send(toSend) + toSend = nil + } + } +}
diff --git a/util/util_test.go b/util/util_test.go index 3a1c5b4..54fd997 100644 --- a/util/util_test.go +++ b/util/util_test.go
@@ -19,8 +19,12 @@ . "github.com/onsi/gomega" "github.com/apid/apid-core/util" + "math/rand" + "net/http" "regexp" + "sync/atomic" "testing" + "time" ) var _ = BeforeSuite(func() { @@ -56,4 +60,106 @@ Ω(util.IsValidUUID(util.GenerateUUID())).Should(BeTrue()) }) }) + + Context("Long polling utils", func() { + It("DistributeEvents", func() { + // make test data + deliverChan := make(chan interface{}) + addSubscriber := make(chan chan interface{}) + subs := make([]chan interface{}, 50+rand.Intn(50)) + for i := range subs { + subs[i] = make(chan interface{}, 1) + } + + // test + go util.DistributeEvents(deliverChan, addSubscriber) + + for i := range subs { + go func(j int) { + addSubscriber <- subs[j] + }(i) + } + + n := rand.Int() + closed := new(int32) + go func(c *int32) { + for atomic.LoadInt32(c) == 0 { + deliverChan <- n + } + }(closed) + + for i := range subs { + Ω(<-subs[i]).Should(Equal(n)) + } + atomic.StoreInt32(closed, 1) + }, 2) + + It("Long polling", func(done Done) { + // make test data + deliverChan := make(chan interface{}) + addSubscriber := make(chan chan interface{}) + go util.DistributeEvents(deliverChan, addSubscriber) + n := rand.Int() + closed := new(int32) + successHandler := func(e interface{}, w http.ResponseWriter) { + defer GinkgoRecover() + Ω(w).Should(BeNil()) + Ω(e).Should(Equal(n)) + atomic.StoreInt32(closed, 1) + close(done) + } + timeoutHandler := func(w http.ResponseWriter) {} + + // Long polling + go util.LongPolling(nil, time.Minute, addSubscriber, successHandler, timeoutHandler) + go func(c *int32) { + for atomic.LoadInt32(c) == 0 { + deliverChan <- n + } + }(closed) + }) + + It("Long polling timeout", func(done Done) { + // make test data + deliverChan := make(chan interface{}) + addSubscriber := make(chan chan interface{}) + go util.DistributeEvents(deliverChan, addSubscriber) + successHandler := func(e interface{}, w http.ResponseWriter) {} + timeoutHandler := func(w http.ResponseWriter) { + defer GinkgoRecover() + Ω(w).Should(BeNil()) + close(done) + } + + // Long polling + go util.LongPolling(nil, time.Second, addSubscriber, successHandler, timeoutHandler) + }, 2) + + It("Debounce", func() { + // make test data + data := make(map[int]int) + inChan := make(chan interface{}) + outChan := make(chan []interface{}) + go util.Debounce(inChan, outChan, time.Second) + for i := 0; i < 5+rand.Intn(5); i++ { + n := rand.Int() + data[n]++ + go func(j int) { + inChan <- j + }(n) + } + + // Debounce + e := <-outChan + for _, m := range e { + num, ok := m.(int) + Ω(ok).Should(BeTrue()) + data[num]-- + } + + for _, v := range data { + Ω(v).Should(BeZero()) + } + }, 2) + }) })