[ISSUE-66918282] add long-polling support in apid-core (#26)

* [ISSUE-66918282] add long-polling support in apid-core

* [ISSUE-66918282] add long-polling support in apid-core

* [ISSUE-66918282] update README
diff --git a/README.md b/README.md
index d920c97..6bf9a09 100644
--- a/README.md
+++ b/README.md
@@ -49,6 +49,14 @@
       // respond to request
     }
 
+## Utils
+apid-core/util package offers common util functions for apid plugins:
+
+* Generate/Validate UUIDs
+* Long Polling
+* Debounce Events
+
+
 ## Running Tests
 
     go test $(glide novendor)
diff --git a/util/util.go b/util/util.go
index ad10100..ad936ff 100644
--- a/util/util.go
+++ b/util/util.go
@@ -16,6 +16,8 @@
 
 import (
 	"github.com/google/uuid"
+	"net/http"
+	"time"
 )
 
 func IsValidUUID(id string) bool {
@@ -26,3 +28,73 @@
 func GenerateUUID() string {
 	return uuid.New().String()
 }
+
+// distributeEvents() receives elements from deliverChan, and send them to subscribers
+// Sending a `chan interface{}` to addSubscriber adds a new subscriber.
+// It closes the subscriber channel after sending the element.
+// `go DistributeEvents(deliverChan, addSubscriber)` should be called during API initialization.
+// Any subscriber sent to `addSubscriber` should be buffered chan.
+func DistributeEvents(deliverChan <-chan interface{}, addSubscriber chan chan interface{}) {
+	subscribers := make([]chan interface{}, 0)
+	for {
+		select {
+		case element, ok := <-deliverChan:
+			if !ok {
+				return
+			}
+			for _, subscriber := range subscribers {
+				go func(sub chan interface{}) {
+					sub <- element
+					close(sub)
+				}(subscriber)
+			}
+			subscribers = make([]chan interface{}, 0)
+		case sub, ok := <-addSubscriber:
+			if !ok {
+				return
+			}
+			subscribers = append(subscribers, sub)
+		}
+	}
+}
+
+// LongPolling() subscribes to `addSubscriber`, and do long-polling until anything is delivered.
+// It calls `successHandler` if receives a notification.
+// It calls `timeoutHandler` if there's a timeout.
+// `go DistributeEvents(deliverChan, addSubscriber)` must have been called during API initialization.
+func LongPolling(w http.ResponseWriter, timeout time.Duration, addSubscriber chan chan interface{}, successHandler func(interface{}, http.ResponseWriter), timeoutHandler func(http.ResponseWriter)) {
+	notifyChan := make(chan interface{}, 1)
+	addSubscriber <- notifyChan
+	select {
+	case n := <-notifyChan:
+		successHandler(n, w)
+	case <-time.After(timeout):
+		timeoutHandler(w)
+	}
+}
+
+// Debounce() packs all elements received from channel `inChan` within the specified time window to one slice,
+// and send it to channel `outChan` periodically. If nothing is received in the time window, nothing will be sent to `outChan`.
+func Debounce(inChan chan interface{}, outChan chan []interface{}, window time.Duration) {
+	send := func(toSend []interface{}) {
+		if toSend != nil {
+			outChan <- toSend
+		}
+	}
+	var toSend []interface{} = nil
+	for {
+		select {
+		case incoming, ok := <-inChan:
+			if ok {
+				toSend = append(toSend, incoming)
+			} else {
+				send(toSend)
+				close(outChan)
+				return
+			}
+		case <-time.After(window):
+			send(toSend)
+			toSend = nil
+		}
+	}
+}
diff --git a/util/util_test.go b/util/util_test.go
index 3a1c5b4..54fd997 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -19,8 +19,12 @@
 	. "github.com/onsi/gomega"
 
 	"github.com/apid/apid-core/util"
+	"math/rand"
+	"net/http"
 	"regexp"
+	"sync/atomic"
 	"testing"
+	"time"
 )
 
 var _ = BeforeSuite(func() {
@@ -56,4 +60,106 @@
 			Ω(util.IsValidUUID(util.GenerateUUID())).Should(BeTrue())
 		})
 	})
+
+	Context("Long polling utils", func() {
+		It("DistributeEvents", func() {
+			// make test data
+			deliverChan := make(chan interface{})
+			addSubscriber := make(chan chan interface{})
+			subs := make([]chan interface{}, 50+rand.Intn(50))
+			for i := range subs {
+				subs[i] = make(chan interface{}, 1)
+			}
+
+			// test
+			go util.DistributeEvents(deliverChan, addSubscriber)
+
+			for i := range subs {
+				go func(j int) {
+					addSubscriber <- subs[j]
+				}(i)
+			}
+
+			n := rand.Int()
+			closed := new(int32)
+			go func(c *int32) {
+				for atomic.LoadInt32(c) == 0 {
+					deliverChan <- n
+				}
+			}(closed)
+
+			for i := range subs {
+				Ω(<-subs[i]).Should(Equal(n))
+			}
+			atomic.StoreInt32(closed, 1)
+		}, 2)
+
+		It("Long polling", func(done Done) {
+			// make test data
+			deliverChan := make(chan interface{})
+			addSubscriber := make(chan chan interface{})
+			go util.DistributeEvents(deliverChan, addSubscriber)
+			n := rand.Int()
+			closed := new(int32)
+			successHandler := func(e interface{}, w http.ResponseWriter) {
+				defer GinkgoRecover()
+				Ω(w).Should(BeNil())
+				Ω(e).Should(Equal(n))
+				atomic.StoreInt32(closed, 1)
+				close(done)
+			}
+			timeoutHandler := func(w http.ResponseWriter) {}
+
+			// Long polling
+			go util.LongPolling(nil, time.Minute, addSubscriber, successHandler, timeoutHandler)
+			go func(c *int32) {
+				for atomic.LoadInt32(c) == 0 {
+					deliverChan <- n
+				}
+			}(closed)
+		})
+
+		It("Long polling timeout", func(done Done) {
+			// make test data
+			deliverChan := make(chan interface{})
+			addSubscriber := make(chan chan interface{})
+			go util.DistributeEvents(deliverChan, addSubscriber)
+			successHandler := func(e interface{}, w http.ResponseWriter) {}
+			timeoutHandler := func(w http.ResponseWriter) {
+				defer GinkgoRecover()
+				Ω(w).Should(BeNil())
+				close(done)
+			}
+
+			// Long polling
+			go util.LongPolling(nil, time.Second, addSubscriber, successHandler, timeoutHandler)
+		}, 2)
+
+		It("Debounce", func() {
+			// make test data
+			data := make(map[int]int)
+			inChan := make(chan interface{})
+			outChan := make(chan []interface{})
+			go util.Debounce(inChan, outChan, time.Second)
+			for i := 0; i < 5+rand.Intn(5); i++ {
+				n := rand.Int()
+				data[n]++
+				go func(j int) {
+					inChan <- j
+				}(n)
+			}
+
+			// Debounce
+			e := <-outChan
+			for _, m := range e {
+				num, ok := m.(int)
+				Ω(ok).Should(BeTrue())
+				data[num]--
+			}
+
+			for _, v := range data {
+				Ω(v).Should(BeZero())
+			}
+		}, 2)
+	})
 })