Add EmitWithCallback() and ListenOnceFunc() functions
diff --git a/events/event_manager.go b/events/event_manager.go index b78d7a3..c7d4393 100644 --- a/events/event_manager.go +++ b/events/event_manager.go
@@ -13,7 +13,26 @@ func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) { log.Debugf("emit selector: '%s' event: %s", selector, event) - em.dispatchers[selector].Send(event) + if !em.dispatchers[selector].Send(event) { + em.sendDelivered(selector, event, 0) // in case of no dispatcher + } +} + +func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) { + log.Debugf("emit with callback selector: '%s' event: %s", selector, event) + + handler := &funcWrapper{em, nil} + handler.HandlerFunc = func(e apid.Event) { + if ede, ok := e.(apid.EventDeliveryEvent); ok { + if ede.Event == event { + em.StopListening(apid.EventDeliveredSelector, handler) + callback(e) + } + } + } + + em.Listen(apid.EventDeliveredSelector, handler) + em.Emit(selector, event) } func (em *eventManager) HasListeners(selector apid.EventSelector) bool { @@ -27,7 +46,7 @@ } list := em.dispatchers[selector] if list == nil { - d := &dispatcher{sync.RWMutex{}, em, selector, nil, nil} + d := &dispatcher{sync.Mutex{}, em, selector, nil, nil} em.dispatchers[selector] = d } em.dispatchers[selector].Add(handler) @@ -43,12 +62,22 @@ func (em *eventManager) ListenFunc(selector apid.EventSelector, handlerFunc apid.EventHandlerFunc) { log.Debugf("listenFunc: '%s' handler: %s", selector, handlerFunc) - handler := &funcWrapper{handlerFunc} + handler := &funcWrapper{em, handlerFunc} + em.Listen(selector, handler) +} + +func (em *eventManager) ListenOnceFunc(selector apid.EventSelector, handlerFunc apid.EventHandlerFunc) { + log.Debugf("listenOnceFunc: '%s' handler: %s", selector, handlerFunc) + handler := &funcWrapper{em, nil} + handler.HandlerFunc = func(event apid.Event) { + em.StopListening(selector, handler) + handlerFunc(event) + } em.Listen(selector, handler) } func (em *eventManager) Close() { - log.Debugf("Closing") + log.Debugf("Closing %d dispatchers", len(em.dispatchers)) dispatchers := em.dispatchers em.dispatchers = nil for _, dispatcher := range dispatchers { @@ -56,8 +85,20 @@ } } +func (em *eventManager) sendDelivered(selector apid.EventSelector, event apid.Event, count int) { + if selector != apid.EventDeliveredSelector { + ede := apid.EventDeliveryEvent{ + Description: "event complete", + Selector: selector, + Event: event, + Count: count, + } + em.dispatchers[apid.EventDeliveredSelector].Send(ede) + } +} + type dispatcher struct { - sync.RWMutex + sync.Mutex em *eventManager selector apid.EventSelector channel chan apid.Event @@ -95,10 +136,12 @@ close(d.channel) } -func (d *dispatcher) Send(e apid.Event) { +func (d *dispatcher) Send(e apid.Event) bool { if d != nil { d.channel <- e + return true } + return false } func (d *dispatcher) HasHandlers() bool { @@ -112,27 +155,20 @@ case event := <-d.channel: if event != nil { log.Debugf("delivering %v to %v", event, d.handlers) - var wg sync.WaitGroup - for _, h := range d.handlers { - handler := h - wg.Add(1) - go func() { - defer wg.Done() - handler.Handle(event) // todo: recover on error? - }() - } - log.Debugf("waiting for handlers") - wg.Wait() - if d.selector != apid.EventDeliveredSelector && - d.em.HasListeners(apid.EventDeliveredSelector) { - - e := apid.EventDeliveryEvent{ - Description: "event complete", - Selector: d.selector, - Event: event, + if len(d.handlers) > 0 { + var wg sync.WaitGroup + for _, h := range d.handlers { + handler := h + wg.Add(1) + go func() { + defer wg.Done() + handler.Handle(event) // todo: recover on error? + }() } - d.em.Emit(apid.EventDeliveredSelector, e) + log.Debugf("waiting for handlers") + wg.Wait() } + d.em.sendDelivered(d.selector, event, len(d.handlers)) log.Debugf("delivery complete") } } @@ -142,9 +178,10 @@ } type funcWrapper struct { - f apid.EventHandlerFunc + *eventManager + HandlerFunc apid.EventHandlerFunc } func (r *funcWrapper) Handle(e apid.Event) { - r.f(e) + r.HandlerFunc(e) }
diff --git a/events/events_test.go b/events/events_test.go index aeeaa26..d080c91 100644 --- a/events/events_test.go +++ b/events/events_test.go
@@ -27,6 +27,8 @@ h := test_handler{ "handler", func(event apid.Event) { + defer GinkgoRecover() + em.Close() close(done) }, @@ -40,6 +42,8 @@ em := events.CreateService() h := func(event apid.Event) { + defer GinkgoRecover() + em.Close() close(done) } @@ -55,6 +59,8 @@ h := test_handler{ "handler", func(event apid.Event) { + defer GinkgoRecover() + c := atomic.AddInt32(&count, 1) if c > 1 { em.Close() @@ -68,6 +74,38 @@ em.Emit("selector", &test_event{"test2"}) }) + It("EmitWithCallback should call the callback when done with delivery", func(done Done) { + em := events.CreateService() + + delivered := func(event apid.Event) { + defer GinkgoRecover() + close(done) + } + + em.EmitWithCallback("selector", &test_event{"test1"}, delivered) + }) + + It("should publish only one event to a listenOnce", func(done Done) { + em := events.CreateService() + + count := 0 + h := func(event apid.Event) { + defer GinkgoRecover() + count++ + } + + delivered := func(event apid.Event) { + defer GinkgoRecover() + Expect(count).To(Equal(1)) + em.Close() + close(done) + } + + em.ListenOnceFunc("selector", h) + em.Emit("selector", &test_event{"test1"}) + em.EmitWithCallback("selector", &test_event{"test2"}, delivered) + }) + It("should publish an event to multiple listeners", func(done Done) { em := events.CreateService() defer em.Close() @@ -77,6 +115,8 @@ h1 := test_handler{ "handler 1", func(event apid.Event) { + defer GinkgoRecover() + hitH1 = true if hitH1 && hitH2 { em.Close() @@ -87,6 +127,8 @@ h2 := test_handler{ "handler 2", func(event apid.Event) { + defer GinkgoRecover() + hitH2 = true if hitH1 && hitH2 { em.Close() @@ -111,6 +153,8 @@ h := test_handler{ "event delivered handler", func(event apid.Event) { + defer GinkgoRecover() + e, ok := event.(apid.EventDeliveryEvent) Expect(ok).To(BeTrue()) @@ -139,6 +183,8 @@ h := test_handler{ "handler", func(event apid.Event) { + defer GinkgoRecover() + Expect(event).NotTo(Equal(event2)) if event == event3 { em.Close() @@ -152,6 +198,8 @@ td := test_handler{ "test driver", func(event apid.Event) { + defer GinkgoRecover() + e := event.(apid.EventDeliveryEvent) if e.Event == event1 { em.StopListening("selector", &h) @@ -178,6 +226,8 @@ h1 := test_handler{ "handler1", func(event apid.Event) { + defer GinkgoRecover() + c := atomic.AddInt32(&count, 1) Expect(event).Should(Equal(e1)) if c == 2 { @@ -190,6 +240,8 @@ h2 := test_handler{ "handler2", func(event apid.Event) { + defer GinkgoRecover() + c := atomic.AddInt32(&count, 1) Expect(event).Should(Equal(e2)) if c == 2 {
diff --git a/events_service.go b/events_service.go index 19a2e90..9f56f93 100644 --- a/events_service.go +++ b/events_service.go
@@ -14,12 +14,18 @@ // publish an event to the selector Emit(selector EventSelector, event Event) + // publish an event to the selector, call the passed handler when all listeners have responded to the event + EmitWithCallback(selector EventSelector, event Event, handler EventHandlerFunc) + // when an event matching selector occurs, run the provided handler Listen(selector EventSelector, handler EventHandler) // when an event matching selector occurs, run the provided handler function ListenFunc(selector EventSelector, handler EventHandlerFunc) + // when an event matching selector occurs, run the provided handler function and stop listening + ListenOnceFunc(selector EventSelector, handler EventHandlerFunc) + // remove a listener StopListening(selector EventSelector, handler EventHandler) @@ -33,4 +39,5 @@ Description string Selector EventSelector Event Event + Count int }