Merge pull request #8 from 30x/XAPID-922
Xapid 922
diff --git a/apid.go b/apid.go
index ea4eb8d..596cfe8 100644
--- a/apid.go
+++ b/apid.go
@@ -71,8 +71,8 @@
}
pie.Plugins = append(pie.Plugins, pluginData)
}
- Events().Emit(SystemEventsSelector, pie)
pluginInitFuncs = nil
+ Events().Emit(SystemEventsSelector, pie)
log.Debugf("done initializing plugins")
}
diff --git a/events/event_manager.go b/events/event_manager.go
index 7d7c69f..8a8a2d3 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -115,6 +115,8 @@
Event: event,
Count: count,
}
+ em.Lock()
+ defer em.Unlock()
em.dispatchers[apid.EventDeliveredSelector].Send(ede)
}
}
@@ -128,6 +130,9 @@
}
func (d *dispatcher) Add(h apid.EventHandler) {
+ if d == nil {
+ return
+ }
d.Lock()
defer d.Unlock()
if d.handlers == nil {
@@ -143,6 +148,9 @@
}
func (d *dispatcher) Remove(h apid.EventHandler) {
+ if d == nil {
+ return
+ }
d.Lock()
defer d.Unlock()
for i := len(d.handlers) - 1; i >= 0; i-- {
@@ -155,24 +163,38 @@
}
func (d *dispatcher) Close() {
+ if d == nil {
+ return
+ }
close(d.channel)
}
func (d *dispatcher) Send(e apid.Event) bool {
- if d != nil {
- d.channel <- e
- return true
+ if d == nil {
+ return false
}
- return false
+ defer func() {
+ if err := recover(); err != nil {
+ log.Warnf("Send %v failed: %v", e, err)
+ }
+ }()
+ d.channel <- e
+ return true
}
func (d *dispatcher) HasHandlers() bool {
+ if d == nil {
+ return false
+ }
d.Lock()
defer d.Unlock()
return d != nil && len(d.handlers) > 0
}
func (d *dispatcher) startDelivery() {
+ if d == nil {
+ return
+ }
go func() {
for {
select {
diff --git a/events/events_test.go b/events/events_test.go
index 1bdcd54..5f8d181 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -9,371 +9,352 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "fmt"
"strconv"
)
var _ = Describe("Events Service", func() {
- It("should ignore event with no listeners", func() {
- em := events.CreateService()
- defer em.Close()
- em.Emit("no listeners", &test_event{"test"})
- })
+ Context("local", func() {
- It("should publish an event to a listener", func(done Done) {
- em := events.CreateService()
+ var em apid.EventsService
- h := test_handler{
- "handler",
- func(event apid.Event) {
- defer GinkgoRecover()
+ BeforeEach(func() {
+ em = events.CreateService()
+ })
+ AfterEach(func() {
+ if em != nil {
em.Close()
- close(done)
- },
- }
-
- em.Listen("selector", &h)
- em.Emit("selector", &test_event{"test"})
- })
-
- It("should publish an event to a listener func", func(done Done) {
- em := events.CreateService()
-
- h := func(event apid.Event) {
- defer GinkgoRecover()
-
- em.Close()
- close(done)
- }
-
- em.ListenFunc("selector", h)
- em.Emit("selector", &test_event{"test"})
- })
-
- It("should publish multiple events to a listener", func(done Done) {
- em := events.CreateService()
-
- count := int32(0)
- h := test_handler{
- "handler",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- c := atomic.AddInt32(&count, 1)
- if c > 1 {
- em.Close()
- close(done)
- }
- },
- }
-
- em.Listen("selector", &h)
- em.Emit("selector", &test_event{"test1"})
- em.Emit("selector", &test_event{"test2"})
- })
-
- It("EmitWithCallback should call the callback when done with delivery", func(done Done) {
- em := events.CreateService()
-
- delivered := func(event apid.Event) {
- defer GinkgoRecover()
- close(done)
- }
-
- em.EmitWithCallback("selector", &test_event{"test1"}, delivered)
- })
-
- It("should publish only one event to a listenOnce", func(done Done) {
- em := events.CreateService()
-
- count := 0
- h := func(event apid.Event) {
- defer GinkgoRecover()
- count++
- }
-
- delivered := func(event apid.Event) {
- defer GinkgoRecover()
- Expect(count).To(Equal(1))
- em.Close()
- close(done)
- }
-
- em.ListenOnceFunc("selector", h)
- em.Emit("selector", &test_event{"test1"})
- em.EmitWithCallback("selector", &test_event{"test2"}, delivered)
- })
-
- It("should publish an event to multiple listeners", func(done Done) {
- em := events.CreateService()
- defer em.Close()
-
- mut := sync.Mutex{}
- hitH1 := false
- hitH2 := false
- h1 := test_handler{
- "handler 1",
- func(event apid.Event) {
- defer GinkgoRecover()
- mut.Lock()
- defer mut.Unlock()
- hitH1 = true
- if hitH1 && hitH2 {
- em.Close()
- close(done)
- }
- },
- }
- h2 := test_handler{
- "handler 2",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- mut.Lock()
- defer mut.Unlock()
- hitH2 = true
- if hitH1 && hitH2 {
- em.Close()
- close(done)
- }
- },
- }
-
- em.Listen("selector", &h1)
- em.Listen("selector", &h2)
- em.Emit("selector", &test_event{"test"})
- })
-
- It("should publish an event delivered event", func(done Done) {
- em := events.CreateService()
- testEvent := &test_event{"test"}
- var testSelector apid.EventSelector = "selector"
-
- dummy := func(event apid.Event) {}
- em.ListenFunc(testSelector, dummy)
-
- h := test_handler{
- "event delivered handler",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- e, ok := event.(apid.EventDeliveryEvent)
-
- Expect(ok).To(BeTrue())
- Expect(e.Event).To(Equal(testEvent))
- Expect(e.Selector).To(Equal(testSelector))
-
- em.Close()
- close(done)
- },
- }
-
- em.Listen(apid.EventDeliveredSelector, &h)
- em.Emit(testSelector, testEvent)
- })
-
- It("should be able to remove a listener", func(done Done) {
- em := events.CreateService()
-
- event1 := &test_event{"test1"}
- event2 := &test_event{"test2"}
- event3 := &test_event{"test3"}
-
- dummy := func(event apid.Event) {}
- em.ListenFunc("selector", dummy)
-
- h := test_handler{
- "handler",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- Expect(event).NotTo(Equal(event2))
- if event == event3 {
- em.Close()
- close(done)
- }
- },
- }
- em.Listen("selector", &h)
-
- // need to drive test like this because of async delivery
- td := test_handler{
- "test driver",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- e := event.(apid.EventDeliveryEvent)
- if e.Event == event1 {
- em.StopListening("selector", &h)
- em.Emit("selector", event2)
- } else if e.Event == event2 {
- em.Listen("selector", &h)
- em.Emit("selector", event3)
- }
- },
- }
- em.Listen(apid.EventDeliveredSelector, &td)
-
- em.Emit("selector", event1)
- })
-
- It("should deliver events according selector", func(done Done) {
- em := events.CreateService()
-
- e1 := &test_event{"test1"}
- e2 := &test_event{"test2"}
-
- count := int32(0)
-
- h1 := test_handler{
- "handler1",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- c := atomic.AddInt32(&count, 1)
- Expect(event).Should(Equal(e1))
- if c == 2 {
- em.Close()
- close(done)
- }
- },
- }
-
- h2 := test_handler{
- "handler2",
- func(event apid.Event) {
- defer GinkgoRecover()
-
- c := atomic.AddInt32(&count, 1)
- Expect(event).Should(Equal(e2))
- if c == 2 {
- em.Close()
- close(done)
- }
- },
- }
-
- em.Listen("selector1", &h1)
- em.Listen("selector2", &h2)
-
- em.Emit("selector2", e2)
- em.Emit("selector1", e1)
- })
-
- It("should publish PluginsInitialized event", func(done Done) {
- xData := make(map[string]interface{})
- xData["schemaVersion"] = "1.2.3"
- p := func(s apid.Services) (pd apid.PluginData, err error) {
- pd = apid.PluginData{
- Name: "test plugin",
- Version: "1.0.0",
- ExtraData: xData,
+ em = nil
}
- return
- }
- apid.RegisterPlugin(p)
+ })
- h := func(event apid.Event) {
- defer GinkgoRecover()
+ It("should ignore event with no listeners", func() {
+ em.Emit("no listeners", &test_event{"test"})
+ })
- if pie, ok := event.(apid.PluginsInitializedEvent); ok {
-
- apid.Events().Close()
-
- Expect(len(pie.Plugins)).Should(Equal(1))
- p := pie.Plugins[0]
- Expect(p.Name).To(Equal("test plugin"))
- Expect(p.Version).To(Equal("1.0.0"))
- Expect(p.ExtraData["schemaVersion"]).To(Equal("1.2.3"))
-
- close(done)
+ It("should publish an event to a listener", func(done Done) {
+ h := test_handler{
+ "handler",
+ func(event apid.Event) {
+ close(done)
+ },
}
- }
- apid.Events().ListenFunc(apid.SystemEventsSelector, h)
- apid.InitializePlugins("")
- })
+ em.Listen("selector", &h)
+ em.Emit("selector", &test_event{"test"})
+ })
- It("shutdown event should be emitted and listened successfully", func(done Done) {
- h := func(event apid.Event) {
- defer GinkgoRecover()
-
- if pie, ok := event.(apid.ShutdownEvent); ok {
- apid.Events().Close()
- Expect(pie.Description).Should(Equal("apid is going to shutdown"))
- fmt.Println(pie.Description)
- close(done)
- } else {
- Fail("Received wrong event")
- }
- }
- apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
- shutdownHandler := func(event apid.Event) {}
- apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler)
- })
-
- It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
- pluginNum := 10
- count := int32(0)
-
- // create and register plugins, listen to shutdown event
- for i:=0; i<pluginNum; i++ {
- apid.RegisterPlugin(createDummyPlugin(i))
+ It("should publish an event to a listener func", func(done Done) {
h := func(event apid.Event) {
+ close(done)
+ }
+
+ em.ListenFunc("selector", h)
+ em.Emit("selector", &test_event{"test"})
+ })
+
+ It("should publish multiple events to a listener", func(done Done) {
+ count := int32(0)
+ h := test_handler{
+ "handler",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ c := atomic.AddInt32(&count, 1)
+ if c > 1 {
+ close(done)
+ }
+ },
+ }
+
+ em.Listen("selector", &h)
+ em.Emit("selector", &test_event{"test1"})
+ em.Emit("selector", &test_event{"test2"})
+ })
+
+ It("EmitWithCallback should call the callback when done with delivery", func(done Done) {
+ delivered := func(event apid.Event) {
+ close(done)
+ }
+
+ em.EmitWithCallback("selector", &test_event{"test1"}, delivered)
+ })
+
+ It("should publish only one event to a listenOnce", func(done Done) {
+ count := 0
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+ count++
+ }
+
+ delivered := func(event apid.Event) {
+ defer GinkgoRecover()
+ Expect(count).To(Equal(1))
+ close(done)
+ }
+
+ em.ListenOnceFunc("selector", h)
+ em.Emit("selector", &test_event{"test1"})
+ em.EmitWithCallback("selector", &test_event{"test2"}, delivered)
+ })
+
+ It("should publish an event to multiple listeners", func(done Done) {
+ mut := sync.Mutex{}
+ hitH1 := false
+ hitH2 := false
+ h1 := test_handler{
+ "handler 1",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+ mut.Lock()
+ defer mut.Unlock()
+ hitH1 = true
+ if hitH1 && hitH2 {
+ close(done)
+ }
+ },
+ }
+ h2 := test_handler{
+ "handler 2",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+ mut.Lock()
+ defer mut.Unlock()
+ hitH2 = true
+ if hitH1 && hitH2 {
+ close(done)
+ }
+ },
+ }
+
+ em.Listen("selector", &h1)
+ em.Listen("selector", &h2)
+ em.Emit("selector", &test_event{"test"})
+ })
+
+ It("should publish an event delivered event", func(done Done) {
+ testEvent := &test_event{"test"}
+ var testSelector apid.EventSelector = "selector"
+
+ dummy := func(event apid.Event) {}
+ em.ListenFunc(testSelector, dummy)
+
+ h := test_handler{
+ "event delivered handler",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ e, ok := event.(apid.EventDeliveryEvent)
+
+ Expect(ok).To(BeTrue())
+ Expect(e.Event).To(Equal(testEvent))
+ Expect(e.Selector).To(Equal(testSelector))
+
+ close(done)
+ },
+ }
+
+ em.Listen(apid.EventDeliveredSelector, &h)
+ em.Emit(testSelector, testEvent)
+ })
+
+ It("should be able to remove a listener", func(done Done) {
+ event1 := &test_event{"test1"}
+ event2 := &test_event{"test2"}
+ event3 := &test_event{"test3"}
+
+ dummy := func(event apid.Event) {}
+ em.ListenFunc("selector", dummy)
+
+ h := test_handler{
+ "handler",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ Expect(event).NotTo(Equal(event2))
+ if event == event3 {
+ close(done)
+ }
+ },
+ }
+ em.Listen("selector", &h)
+
+ // need to drive test like this because of async delivery
+ var td apid.EventHandler
+ td = &test_handler{
+ "test driver",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ e := event.(apid.EventDeliveryEvent)
+ if e.Event == event1 {
+ em.StopListening("selector", &h)
+ em.Emit("selector", event2)
+ } else if e.Event == event2 {
+ em.StopListening(apid.EventDeliveredSelector, td)
+ em.Listen("selector", &h)
+ em.Emit("selector", event3)
+ }
+ },
+ }
+ em.Listen(apid.EventDeliveredSelector, td)
+
+ em.Emit("selector", event1)
+ })
+
+ It("should deliver events according selector", func(done Done) {
+ e1 := &test_event{"test1"}
+ e2 := &test_event{"test2"}
+
+ count := int32(0)
+
+ h1 := test_handler{
+ "handler1",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ c := atomic.AddInt32(&count, 1)
+ Expect(event).Should(Equal(e1))
+ if c == 2 {
+ close(done)
+ }
+ },
+ }
+
+ h2 := test_handler{
+ "handler2",
+ func(event apid.Event) {
+ defer GinkgoRecover()
+
+ c := atomic.AddInt32(&count, 1)
+ Expect(event).Should(Equal(e2))
+ if c == 2 {
+ close(done)
+ }
+ },
+ }
+
+ em.Listen("selector1", &h1)
+ em.Listen("selector2", &h2)
+
+ em.Emit("selector2", e2)
+ em.Emit("selector1", e1)
+ })
+ })
+
+ Context("plugins", func() {
+
+ BeforeEach(func() {
+ })
+
+ AfterEach(func() {
+ apid.Events().Close()
+ })
+
+ It("should publish PluginsInitialized event", func(done Done) {
+ xData := make(map[string]interface{})
+ xData["schemaVersion"] = "1.2.3"
+ p := func(s apid.Services) (pd apid.PluginData, err error) {
+ pd = apid.PluginData{
+ Name: "test plugin",
+ Version: "1.0.0",
+ ExtraData: xData,
+ }
+ return
+ }
+ apid.RegisterPlugin(p)
+
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+
+ if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+
+ Expect(len(pie.Plugins)).Should(Equal(1))
+ p := pie.Plugins[0]
+ Expect(p.Name).To(Equal("test plugin"))
+ Expect(p.Version).To(Equal("1.0.0"))
+ Expect(p.ExtraData["schemaVersion"]).To(Equal("1.2.3"))
+
+ close(done)
+ }
+ }
+ apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+
+ apid.InitializePlugins("")
+ })
+
+ It("shutdown event should be emitted and listened successfully", func(done Done) {
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+
if pie, ok := event.(apid.ShutdownEvent); ok {
Expect(pie.Description).Should(Equal("apid is going to shutdown"))
- atomic.AddInt32(&count, 1)
} else {
Fail("Received wrong event")
}
}
apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
- }
+ <- apid.Events().Emit(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"})
+ close(done)
+ })
+ It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
+ pluginNum := 10
+ count := int32(0)
+ countP := &count
- apid.InitializePlugins("")
-
- apid.ShutdownPluginsAndWait()
-
- defer GinkgoRecover()
- apid.Events().Close()
-
- // handlers of all registered plugins have executed
- Expect(count).Should(Equal(int32(pluginNum)))
-
- close(done)
- })
-
- It("should be able to read apid version from PluginsInitialized event", func(done Done) {
- xData := make(map[string]interface{})
- xData["schemaVersion"] = "1.2.3"
- p := func(s apid.Services) (pd apid.PluginData, err error) {
- pd = apid.PluginData{
- Name: "test plugin",
- Version: "1.0.0",
- ExtraData: xData,
+ // create and register plugins, listen to shutdown event
+ for i:=0; i<pluginNum; i++ {
+ apid.RegisterPlugin(createDummyPlugin(i))
+ h := func(event apid.Event) {
+ if pie, ok := event.(apid.ShutdownEvent); ok {
+ Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+ atomic.AddInt32(countP, 1)
+ } else {
+ Fail("Received wrong event")
+ }
+ }
+ apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
}
- return
- }
- apid.RegisterPlugin(p)
- apidVersion := "dummy_version"
- h := func(event apid.Event) {
- defer GinkgoRecover()
+ apid.InitializePlugins("")
- if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+ apid.ShutdownPluginsAndWait()
- apid.Events().Close()
- Expect(pie.ApidVersion).To(Equal(apidVersion))
- close(done)
+ // handlers of all registered plugins have executed
+ Expect(count).Should(Equal(int32(pluginNum)))
+ close(done)
+ })
+
+ It("should be able to read apid version from PluginsInitialized event", func(done Done) {
+ xData := make(map[string]interface{})
+ xData["schemaVersion"] = "1.2.3"
+ p := func(s apid.Services) (pd apid.PluginData, err error) {
+ pd = apid.PluginData{
+ Name: "test plugin",
+ Version: "1.0.0",
+ ExtraData: xData,
+ }
+ return
}
- }
- apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+ apid.RegisterPlugin(p)
- apid.InitializePlugins(apidVersion)
+ apidVersion := "dummy_version"
+
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+
+ if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+ Expect(pie.ApidVersion).To(Equal(apidVersion))
+ close(done)
+ }
+ }
+ apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+
+ apid.InitializePlugins(apidVersion)
+ })
})
})
@@ -389,7 +370,6 @@
return
}
return p
-
}
type test_handler struct {