lock down race conditions
diff --git a/events/event_manager.go b/events/event_manager.go index 33b0ee1..527060c 100644 --- a/events/event_manager.go +++ b/events/event_manager.go
@@ -1,19 +1,24 @@ package events import ( - "github.com/30x/apid-core" "sync" + + "github.com/30x/apid-core" ) // events published to a given channel are processed entirely in order, though delivery to listeners is async type eventManager struct { + sync.Mutex dispatchers map[apid.EventSelector]*dispatcher } func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) { log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event) - if !em.dispatchers[selector].Send(event) { + em.Lock() + dispatch := em.dispatchers[selector] + em.Unlock() + if !dispatch.Send(event) { em.sendDelivered(selector, event, 0) // in case of no dispatcher } } @@ -36,16 +41,20 @@ } func (em *eventManager) HasListeners(selector apid.EventSelector) bool { - return em.dispatchers[selector].HasHandlers() + em.Lock() + dispatch := em.dispatchers[selector] + em.Unlock() + return dispatch.HasHandlers() } func (em *eventManager) Listen(selector apid.EventSelector, handler apid.EventHandler) { + em.Lock() + defer em.Unlock() log.Debugf("listen: '%s' handler: %v", selector, handler) if em.dispatchers == nil { em.dispatchers = make(map[apid.EventSelector]*dispatcher) } - list := em.dispatchers[selector] - if list == nil { + if em.dispatchers[selector] == nil { d := &dispatcher{sync.Mutex{}, em, selector, nil, nil} em.dispatchers[selector] = d } @@ -53,6 +62,8 @@ } func (em *eventManager) StopListening(selector apid.EventSelector, handler apid.EventHandler) { + em.Lock() + defer em.Unlock() log.Debugf("stop listening: '%s' handler: %v", selector, handler) if em.dispatchers == nil { return @@ -77,9 +88,11 @@ } func (em *eventManager) Close() { - log.Debugf("Closing %d dispatchers", len(em.dispatchers)) + em.Lock() dispatchers := em.dispatchers em.dispatchers = nil + em.Unlock() + log.Debugf("Closing %d dispatchers", len(dispatchers)) for _, dispatcher := range dispatchers { dispatcher.Close() } @@ -145,6 +158,8 @@ } func (d *dispatcher) HasHandlers() bool { + d.Lock() + defer d.Unlock() return d != nil && len(d.handlers) > 0 } @@ -154,10 +169,13 @@ select { case event := <-d.channel: if event != nil { - log.Debugf("delivering %v to %v", &event, d.handlers) - if len(d.handlers) > 0 { + d.Lock() + handlers := d.handlers + d.Unlock() + log.Debugf("delivering %v to %v", &event, handlers) + if len(handlers) > 0 { var wg sync.WaitGroup - for _, h := range d.handlers { + for _, h := range handlers { handler := h wg.Add(1) go func() { @@ -168,7 +186,7 @@ log.Debugf("waiting for handlers") wg.Wait() } - d.em.sendDelivered(d.selector, event, len(d.handlers)) + d.em.sendDelivered(d.selector, event, len(handlers)) log.Debugf("event %v delivered", &event) } }
diff --git a/events/events_test.go b/events/events_test.go index 70bc19a..ae3f88c 100644 --- a/events/events_test.go +++ b/events/events_test.go
@@ -1,11 +1,13 @@ package events_test import ( + "sync" + "sync/atomic" + "github.com/30x/apid-core" "github.com/30x/apid-core/events" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "sync/atomic" ) var _ = Describe("Events Service", func() { @@ -105,13 +107,15 @@ em := events.CreateService() defer em.Close() + mut := sync.Mutex{} hitH1 := false hitH2 := false h1 := test_handler{ "handler 1", func(event apid.Event) { defer GinkgoRecover() - + mut.Lock() + defer mut.Unlock() hitH1 = true if hitH1 && hitH2 { em.Close() @@ -124,6 +128,8 @@ func(event apid.Event) { defer GinkgoRecover() + mut.Lock() + defer mut.Unlock() hitH2 = true if hitH1 && hitH2 { em.Close() @@ -258,8 +264,8 @@ xData["schemaVersion"] = "1.2.3" p := func(s apid.Services) (pd apid.PluginData, err error) { pd = apid.PluginData{ - Name: "test plugin", - Version: "1.0.0", + Name: "test plugin", + Version: "1.0.0", ExtraData: xData, } return