Merge branch 'master' into XAPID-818
diff --git a/events/event_manager.go b/events/event_manager.go
index ed4801c..b98089e 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -1,20 +1,23 @@
package events
import (
- "github.com/30x/apid-core"
"sync"
"reflect"
+
+ "github.com/30x/apid-core"
)
// events published to a given channel are processed entirely in order, though delivery to listeners is async
type eventManager struct {
+ sync.Mutex
dispatchers map[apid.EventSelector]*dispatcher
}
func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
+
responseChannel := make(chan apid.Event, 1)
em.EmitWithCallback(selector, event, func(event apid.Event) {
responseChannel <- event
@@ -42,16 +45,20 @@
}
func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
- return em.dispatchers[selector].HasHandlers()
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+ return dispatch.HasHandlers()
}
func (em *eventManager) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("listen: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
em.dispatchers = make(map[apid.EventSelector]*dispatcher)
}
- list := em.dispatchers[selector]
- if list == nil {
+ if em.dispatchers[selector] == nil {
d := &dispatcher{sync.Mutex{}, em, selector, nil, nil}
em.dispatchers[selector] = d
}
@@ -59,6 +66,8 @@
}
func (em *eventManager) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("stop listening: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
return
@@ -83,9 +92,11 @@
}
func (em *eventManager) Close() {
- log.Debugf("Closing %d dispatchers", len(em.dispatchers))
+ em.Lock()
dispatchers := em.dispatchers
em.dispatchers = nil
+ em.Unlock()
+ log.Debugf("Closing %d dispatchers", len(dispatchers))
for _, dispatcher := range dispatchers {
dispatcher.Close()
}
@@ -151,6 +162,8 @@
}
func (d *dispatcher) HasHandlers() bool {
+ d.Lock()
+ defer d.Unlock()
return d != nil && len(d.handlers) > 0
}
@@ -160,10 +173,13 @@
select {
case event := <-d.channel:
if event != nil {
- log.Debugf("delivering %v to %v", &event, d.handlers)
- if len(d.handlers) > 0 {
+ d.Lock()
+ handlers := d.handlers
+ d.Unlock()
+ log.Debugf("delivering %v to %v", &event, handlers)
+ if len(handlers) > 0 {
var wg sync.WaitGroup
- for _, h := range d.handlers {
+ for _, h := range handlers {
handler := h
wg.Add(1)
go func() {
@@ -174,7 +190,7 @@
log.Debugf("waiting for handlers")
wg.Wait()
}
- d.em.sendDelivered(d.selector, event, len(d.handlers))
+ d.em.sendDelivered(d.selector, event, len(handlers))
log.Debugf("event %v delivered", &event)
}
}
diff --git a/events/events_test.go b/events/events_test.go
index 2e1bd96..e1099c1 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -1,10 +1,14 @@
package events_test
import (
+ "sync"
+ "sync/atomic"
+
"github.com/30x/apid-core"
"github.com/30x/apid-core/events"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+
"sync/atomic"
"fmt"
"strconv"
@@ -107,13 +111,15 @@
em := events.CreateService()
defer em.Close()
+ mut := sync.Mutex{}
hitH1 := false
hitH2 := false
h1 := test_handler{
"handler 1",
func(event apid.Event) {
defer GinkgoRecover()
-
+ mut.Lock()
+ defer mut.Unlock()
hitH1 = true
if hitH1 && hitH2 {
em.Close()
@@ -126,6 +132,8 @@
func(event apid.Event) {
defer GinkgoRecover()
+ mut.Lock()
+ defer mut.Unlock()
hitH2 = true
if hitH1 && hitH2 {
em.Close()
@@ -260,8 +268,8 @@
xData["schemaVersion"] = "1.2.3"
p := func(s apid.Services) (pd apid.PluginData, err error) {
pd = apid.PluginData{
- Name: "test plugin",
- Version: "1.0.0",
+ Name: "test plugin",
+ Version: "1.0.0",
ExtraData: xData,
}
return