lock down race conditions
diff --git a/events/event_manager.go b/events/event_manager.go
index 33b0ee1..527060c 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -1,19 +1,24 @@
package events
import (
- "github.com/30x/apid-core"
"sync"
+
+ "github.com/30x/apid-core"
)
// events published to a given channel are processed entirely in order, though delivery to listeners is async
type eventManager struct {
+ sync.Mutex
dispatchers map[apid.EventSelector]*dispatcher
}
func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) {
log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
- if !em.dispatchers[selector].Send(event) {
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+ if !dispatch.Send(event) {
em.sendDelivered(selector, event, 0) // in case of no dispatcher
}
}
@@ -36,16 +41,20 @@
}
func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
- return em.dispatchers[selector].HasHandlers()
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+ return dispatch.HasHandlers()
}
func (em *eventManager) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("listen: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
em.dispatchers = make(map[apid.EventSelector]*dispatcher)
}
- list := em.dispatchers[selector]
- if list == nil {
+ if em.dispatchers[selector] == nil {
d := &dispatcher{sync.Mutex{}, em, selector, nil, nil}
em.dispatchers[selector] = d
}
@@ -53,6 +62,8 @@
}
func (em *eventManager) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("stop listening: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
return
@@ -77,9 +88,11 @@
}
func (em *eventManager) Close() {
- log.Debugf("Closing %d dispatchers", len(em.dispatchers))
+ em.Lock()
dispatchers := em.dispatchers
em.dispatchers = nil
+ em.Unlock()
+ log.Debugf("Closing %d dispatchers", len(dispatchers))
for _, dispatcher := range dispatchers {
dispatcher.Close()
}
@@ -145,6 +158,8 @@
}
func (d *dispatcher) HasHandlers() bool {
+ d.Lock()
+ defer d.Unlock()
return d != nil && len(d.handlers) > 0
}
@@ -154,10 +169,13 @@
select {
case event := <-d.channel:
if event != nil {
- log.Debugf("delivering %v to %v", &event, d.handlers)
- if len(d.handlers) > 0 {
+ d.Lock()
+ handlers := d.handlers
+ d.Unlock()
+ log.Debugf("delivering %v to %v", &event, handlers)
+ if len(handlers) > 0 {
var wg sync.WaitGroup
- for _, h := range d.handlers {
+ for _, h := range handlers {
handler := h
wg.Add(1)
go func() {
@@ -168,7 +186,7 @@
log.Debugf("waiting for handlers")
wg.Wait()
}
- d.em.sendDelivered(d.selector, event, len(d.handlers))
+ d.em.sendDelivered(d.selector, event, len(handlers))
log.Debugf("event %v delivered", &event)
}
}
diff --git a/events/events_test.go b/events/events_test.go
index 70bc19a..ae3f88c 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -1,11 +1,13 @@
package events_test
import (
+ "sync"
+ "sync/atomic"
+
"github.com/30x/apid-core"
"github.com/30x/apid-core/events"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "sync/atomic"
)
var _ = Describe("Events Service", func() {
@@ -105,13 +107,15 @@
em := events.CreateService()
defer em.Close()
+ mut := sync.Mutex{}
hitH1 := false
hitH2 := false
h1 := test_handler{
"handler 1",
func(event apid.Event) {
defer GinkgoRecover()
-
+ mut.Lock()
+ defer mut.Unlock()
hitH1 = true
if hitH1 && hitH2 {
em.Close()
@@ -124,6 +128,8 @@
func(event apid.Event) {
defer GinkgoRecover()
+ mut.Lock()
+ defer mut.Unlock()
hitH2 = true
if hitH1 && hitH2 {
em.Close()
@@ -258,8 +264,8 @@
xData["schemaVersion"] = "1.2.3"
p := func(s apid.Services) (pd apid.PluginData, err error) {
pd = apid.PluginData{
- Name: "test plugin",
- Version: "1.0.0",
+ Name: "test plugin",
+ Version: "1.0.0",
ExtraData: xData,
}
return