refactor emit() and emitwithcallback()
diff --git a/events/event_manager.go b/events/event_manager.go index 12e8f82..ed4801c 100644 --- a/events/event_manager.go +++ b/events/event_manager.go
@@ -3,6 +3,7 @@ import ( "github.com/30x/apid-core" "sync" + "reflect" ) // events published to a given channel are processed entirely in order, though delivery to listeners is async @@ -11,12 +12,14 @@ dispatchers map[apid.EventSelector]*dispatcher } -func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) { +func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event { + log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event) - eventDispatcher := em.dispatchers[selector] - if (eventDispatcher==nil) || (!eventDispatcher.Send(event)) { - em.sendDelivered(selector, event, 0) // in case of no dispatcher - } + responseChannel := make(chan apid.Event, 1) + em.EmitWithCallback(selector, event, func(event apid.Event) { + responseChannel <- event + }) + return responseChannel } func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) { @@ -25,7 +28,7 @@ handler := &funcWrapper{em, nil} handler.HandlerFunc = func(e apid.Event) { if ede, ok := e.(apid.EventDeliveryEvent); ok { - if ede.Event == event { + if reflect.DeepEqual(ede.Event, event) { em.StopListening(apid.EventDeliveredSelector, handler) callback(e) } @@ -33,7 +36,9 @@ } em.Listen(apid.EventDeliveredSelector, handler) - em.Emit(selector, event) + if !em.dispatchers[selector].Send(event) { + em.sendDelivered(selector, event, 0) // in case of no dispatcher + } } func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
diff --git a/events/events_test.go b/events/events_test.go index b2e3f9d..8d20b90 100644 --- a/events/events_test.go +++ b/events/events_test.go
@@ -309,8 +309,7 @@ It("handlers registered by plugins should execute before apid shutdown", func(done Done) { pluginNum := 10 - count := make([]int, 1) - count[0] = 0 + count := int32(0) // create and register plugins, listen to shutdown event for i:=0; i<pluginNum; i++ { @@ -318,7 +317,7 @@ h := func(event apid.Event) { if pie, ok := event.(apid.ShutdownEvent); ok { Expect(pie.Description).Should(Equal("apid is going to shutdown")) - count[0] += 1 + atomic.AddInt32(&count, 1) } else { Fail("Received wrong event") } @@ -329,16 +328,15 @@ apid.InitializePlugins() - apid.ShutdownPlugins() - - apid.WaitPluginsShutdown() + apid.ShutdownPluginsAndWait() defer GinkgoRecover() apid.Events().Close() - // handlers of all registered plugins have executed - Expect(count[0]).Should(Equal(pluginNum)) - fmt.Println("handlers of all registered plugins have executed") + // handlers of all registered plugins have executed + Expect(count).Should(Equal(int32(pluginNum))) + + fmt.Println("handlers of all registered plugins have executed") close(done) }) })