refactor emit() and emitwithcallback()
diff --git a/events/event_manager.go b/events/event_manager.go
index 12e8f82..ed4801c 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -3,6 +3,7 @@
import (
"github.com/30x/apid-core"
"sync"
+ "reflect"
)
// events published to a given channel are processed entirely in order, though delivery to listeners is async
@@ -11,12 +12,14 @@
dispatchers map[apid.EventSelector]*dispatcher
}
-func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) {
+func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+
log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
- eventDispatcher := em.dispatchers[selector]
- if (eventDispatcher==nil) || (!eventDispatcher.Send(event)) {
- em.sendDelivered(selector, event, 0) // in case of no dispatcher
- }
+ responseChannel := make(chan apid.Event, 1)
+ em.EmitWithCallback(selector, event, func(event apid.Event) {
+ responseChannel <- event
+ })
+ return responseChannel
}
func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) {
@@ -25,7 +28,7 @@
handler := &funcWrapper{em, nil}
handler.HandlerFunc = func(e apid.Event) {
if ede, ok := e.(apid.EventDeliveryEvent); ok {
- if ede.Event == event {
+ if reflect.DeepEqual(ede.Event, event) {
em.StopListening(apid.EventDeliveredSelector, handler)
callback(e)
}
@@ -33,7 +36,9 @@
}
em.Listen(apid.EventDeliveredSelector, handler)
- em.Emit(selector, event)
+ if !em.dispatchers[selector].Send(event) {
+ em.sendDelivered(selector, event, 0) // in case of no dispatcher
+ }
}
func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
diff --git a/events/events_test.go b/events/events_test.go
index b2e3f9d..8d20b90 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -309,8 +309,7 @@
It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
pluginNum := 10
- count := make([]int, 1)
- count[0] = 0
+ count := int32(0)
// create and register plugins, listen to shutdown event
for i:=0; i<pluginNum; i++ {
@@ -318,7 +317,7 @@
h := func(event apid.Event) {
if pie, ok := event.(apid.ShutdownEvent); ok {
Expect(pie.Description).Should(Equal("apid is going to shutdown"))
- count[0] += 1
+ atomic.AddInt32(&count, 1)
} else {
Fail("Received wrong event")
}
@@ -329,16 +328,15 @@
apid.InitializePlugins()
- apid.ShutdownPlugins()
-
- apid.WaitPluginsShutdown()
+ apid.ShutdownPluginsAndWait()
defer GinkgoRecover()
apid.Events().Close()
- // handlers of all registered plugins have executed
- Expect(count[0]).Should(Equal(pluginNum))
- fmt.Println("handlers of all registered plugins have executed")
+ // handlers of all registered plugins have executed
+ Expect(count).Should(Equal(int32(pluginNum)))
+
+ fmt.Println("handlers of all registered plugins have executed")
close(done)
})
})