Merge pull request #8 from 30x/XAPID-922

Xapid 922
diff --git a/apid.go b/apid.go
index ea4eb8d..596cfe8 100644
--- a/apid.go
+++ b/apid.go
@@ -71,8 +71,8 @@
 		}
 		pie.Plugins = append(pie.Plugins, pluginData)
 	}
-	Events().Emit(SystemEventsSelector, pie)
 	pluginInitFuncs = nil
+	Events().Emit(SystemEventsSelector, pie)
 	log.Debugf("done initializing plugins")
 }
 
diff --git a/events/event_manager.go b/events/event_manager.go
index 7d7c69f..8a8a2d3 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -115,6 +115,8 @@
 			Event:       event,
 			Count:       count,
 		}
+		em.Lock()
+		defer em.Unlock()
 		em.dispatchers[apid.EventDeliveredSelector].Send(ede)
 	}
 }
@@ -128,6 +130,9 @@
 }
 
 func (d *dispatcher) Add(h apid.EventHandler) {
+	if d == nil {
+		return
+	}
 	d.Lock()
 	defer d.Unlock()
 	if d.handlers == nil {
@@ -143,6 +148,9 @@
 }
 
 func (d *dispatcher) Remove(h apid.EventHandler) {
+	if d == nil {
+		return
+	}
 	d.Lock()
 	defer d.Unlock()
 	for i := len(d.handlers) - 1; i >= 0; i-- {
@@ -155,24 +163,38 @@
 }
 
 func (d *dispatcher) Close() {
+	if d == nil {
+		return
+	}
 	close(d.channel)
 }
 
 func (d *dispatcher) Send(e apid.Event) bool {
-	if d != nil {
-		d.channel <- e
-		return true
+	if d == nil {
+		return false
 	}
-	return false
+	defer func() {
+		if err := recover(); err != nil {
+			log.Warnf("Send %v failed: %v", e, err)
+		}
+	}()
+	d.channel <- e
+	return true
 }
 
 func (d *dispatcher) HasHandlers() bool {
+	if d == nil {
+		return false
+	}
 	d.Lock()
 	defer d.Unlock()
 	return d != nil && len(d.handlers) > 0
 }
 
 func (d *dispatcher) startDelivery() {
+	if d == nil {
+		return
+	}
 	go func() {
 		for {
 			select {
diff --git a/events/events_test.go b/events/events_test.go
index 1bdcd54..5f8d181 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -9,371 +9,352 @@
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
 
-	"fmt"
 	"strconv"
 )
 
 var _ = Describe("Events Service", func() {
 
-	It("should ignore event with no listeners", func() {
-		em := events.CreateService()
-		defer em.Close()
-		em.Emit("no listeners", &test_event{"test"})
-	})
+	Context("local", func() {
 
-	It("should publish an event to a listener", func(done Done) {
-		em := events.CreateService()
+		var em apid.EventsService
 
-		h := test_handler{
-			"handler",
-			func(event apid.Event) {
-				defer GinkgoRecover()
+		BeforeEach(func() {
+			em = events.CreateService()
+		})
 
+		AfterEach(func() {
+			if em != nil {
 				em.Close()
-				close(done)
-			},
-		}
-
-		em.Listen("selector", &h)
-		em.Emit("selector", &test_event{"test"})
-	})
-
-	It("should publish an event to a listener func", func(done Done) {
-		em := events.CreateService()
-
-		h := func(event apid.Event) {
-			defer GinkgoRecover()
-
-			em.Close()
-			close(done)
-		}
-
-		em.ListenFunc("selector", h)
-		em.Emit("selector", &test_event{"test"})
-	})
-
-	It("should publish multiple events to a listener", func(done Done) {
-		em := events.CreateService()
-
-		count := int32(0)
-		h := test_handler{
-			"handler",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				c := atomic.AddInt32(&count, 1)
-				if c > 1 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-
-		em.Listen("selector", &h)
-		em.Emit("selector", &test_event{"test1"})
-		em.Emit("selector", &test_event{"test2"})
-	})
-
-	It("EmitWithCallback should call the callback when done with delivery", func(done Done) {
-		em := events.CreateService()
-
-		delivered := func(event apid.Event) {
-			defer GinkgoRecover()
-			close(done)
-		}
-
-		em.EmitWithCallback("selector", &test_event{"test1"}, delivered)
-	})
-
-	It("should publish only one event to a listenOnce", func(done Done) {
-		em := events.CreateService()
-
-		count := 0
-		h := func(event apid.Event) {
-			defer GinkgoRecover()
-			count++
-		}
-
-		delivered := func(event apid.Event) {
-			defer GinkgoRecover()
-			Expect(count).To(Equal(1))
-			em.Close()
-			close(done)
-		}
-
-		em.ListenOnceFunc("selector", h)
-		em.Emit("selector", &test_event{"test1"})
-		em.EmitWithCallback("selector", &test_event{"test2"}, delivered)
-	})
-
-	It("should publish an event to multiple listeners", func(done Done) {
-		em := events.CreateService()
-		defer em.Close()
-
-		mut := sync.Mutex{}
-		hitH1 := false
-		hitH2 := false
-		h1 := test_handler{
-			"handler 1",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-				mut.Lock()
-				defer mut.Unlock()
-				hitH1 = true
-				if hitH1 && hitH2 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-		h2 := test_handler{
-			"handler 2",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				mut.Lock()
-				defer mut.Unlock()
-				hitH2 = true
-				if hitH1 && hitH2 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-
-		em.Listen("selector", &h1)
-		em.Listen("selector", &h2)
-		em.Emit("selector", &test_event{"test"})
-	})
-
-	It("should publish an event delivered event", func(done Done) {
-		em := events.CreateService()
-		testEvent := &test_event{"test"}
-		var testSelector apid.EventSelector = "selector"
-
-		dummy := func(event apid.Event) {}
-		em.ListenFunc(testSelector, dummy)
-
-		h := test_handler{
-			"event delivered handler",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				e, ok := event.(apid.EventDeliveryEvent)
-
-				Expect(ok).To(BeTrue())
-				Expect(e.Event).To(Equal(testEvent))
-				Expect(e.Selector).To(Equal(testSelector))
-
-				em.Close()
-				close(done)
-			},
-		}
-
-		em.Listen(apid.EventDeliveredSelector, &h)
-		em.Emit(testSelector, testEvent)
-	})
-
-	It("should be able to remove a listener", func(done Done) {
-		em := events.CreateService()
-
-		event1 := &test_event{"test1"}
-		event2 := &test_event{"test2"}
-		event3 := &test_event{"test3"}
-
-		dummy := func(event apid.Event) {}
-		em.ListenFunc("selector", dummy)
-
-		h := test_handler{
-			"handler",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				Expect(event).NotTo(Equal(event2))
-				if event == event3 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-		em.Listen("selector", &h)
-
-		// need to drive test like this because of async delivery
-		td := test_handler{
-			"test driver",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				e := event.(apid.EventDeliveryEvent)
-				if e.Event == event1 {
-					em.StopListening("selector", &h)
-					em.Emit("selector", event2)
-				} else if e.Event == event2 {
-					em.Listen("selector", &h)
-					em.Emit("selector", event3)
-				}
-			},
-		}
-		em.Listen(apid.EventDeliveredSelector, &td)
-
-		em.Emit("selector", event1)
-	})
-
-	It("should deliver events according selector", func(done Done) {
-		em := events.CreateService()
-
-		e1 := &test_event{"test1"}
-		e2 := &test_event{"test2"}
-
-		count := int32(0)
-
-		h1 := test_handler{
-			"handler1",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				c := atomic.AddInt32(&count, 1)
-				Expect(event).Should(Equal(e1))
-				if c == 2 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-
-		h2 := test_handler{
-			"handler2",
-			func(event apid.Event) {
-				defer GinkgoRecover()
-
-				c := atomic.AddInt32(&count, 1)
-				Expect(event).Should(Equal(e2))
-				if c == 2 {
-					em.Close()
-					close(done)
-				}
-			},
-		}
-
-		em.Listen("selector1", &h1)
-		em.Listen("selector2", &h2)
-
-		em.Emit("selector2", e2)
-		em.Emit("selector1", e1)
-	})
-
-	It("should publish PluginsInitialized event", func(done Done) {
-		xData := make(map[string]interface{})
-		xData["schemaVersion"] = "1.2.3"
-		p := func(s apid.Services) (pd apid.PluginData, err error) {
-			pd = apid.PluginData{
-				Name:      "test plugin",
-				Version:   "1.0.0",
-				ExtraData: xData,
+				em = nil
 			}
-			return
-		}
-		apid.RegisterPlugin(p)
+		})
 
-		h := func(event apid.Event) {
-			defer GinkgoRecover()
+		It("should ignore event with no listeners", func() {
+			em.Emit("no listeners", &test_event{"test"})
+		})
 
-			if pie, ok := event.(apid.PluginsInitializedEvent); ok {
-
-				apid.Events().Close()
-
-				Expect(len(pie.Plugins)).Should(Equal(1))
-				p := pie.Plugins[0]
-				Expect(p.Name).To(Equal("test plugin"))
-				Expect(p.Version).To(Equal("1.0.0"))
-				Expect(p.ExtraData["schemaVersion"]).To(Equal("1.2.3"))
-
-				close(done)
+		It("should publish an event to a listener", func(done Done) {
+			h := test_handler{
+				"handler",
+				func(event apid.Event) {
+					close(done)
+				},
 			}
-		}
-		apid.Events().ListenFunc(apid.SystemEventsSelector, h)
 
-		apid.InitializePlugins("")
-	})
+			em.Listen("selector", &h)
+			em.Emit("selector", &test_event{"test"})
+		})
 
-	It("shutdown event should be emitted and listened successfully", func(done Done) {
-		h := func(event apid.Event) {
-			defer GinkgoRecover()
-
-			if pie, ok := event.(apid.ShutdownEvent); ok {
-				apid.Events().Close()
-				Expect(pie.Description).Should(Equal("apid is going to shutdown"))
-				fmt.Println(pie.Description)
-				close(done)
-			} else {
-				Fail("Received wrong event")
-			}
-		}
-		apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
-		shutdownHandler := func(event apid.Event) {}
-		apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler)
-	})
-
-	It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
-		pluginNum := 10
-		count := int32(0)
-
-		// create and register plugins, listen to shutdown event
-		for i:=0; i<pluginNum; i++ {
-			apid.RegisterPlugin(createDummyPlugin(i))
+		It("should publish an event to a listener func", func(done Done) {
 			h := func(event apid.Event) {
+				close(done)
+			}
+
+			em.ListenFunc("selector", h)
+			em.Emit("selector", &test_event{"test"})
+		})
+
+		It("should publish multiple events to a listener", func(done Done) {
+			count := int32(0)
+			h := test_handler{
+				"handler",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					c := atomic.AddInt32(&count, 1)
+					if c > 1 {
+						close(done)
+					}
+				},
+			}
+
+			em.Listen("selector", &h)
+			em.Emit("selector", &test_event{"test1"})
+			em.Emit("selector", &test_event{"test2"})
+		})
+
+		It("EmitWithCallback should call the callback when done with delivery", func(done Done) {
+			delivered := func(event apid.Event) {
+				close(done)
+			}
+
+			em.EmitWithCallback("selector", &test_event{"test1"}, delivered)
+		})
+
+		It("should publish only one event to a listenOnce", func(done Done) {
+			count := 0
+			h := func(event apid.Event) {
+				defer GinkgoRecover()
+				count++
+			}
+
+			delivered := func(event apid.Event) {
+				defer GinkgoRecover()
+				Expect(count).To(Equal(1))
+				close(done)
+			}
+
+			em.ListenOnceFunc("selector", h)
+			em.Emit("selector", &test_event{"test1"})
+			em.EmitWithCallback("selector", &test_event{"test2"}, delivered)
+		})
+
+		It("should publish an event to multiple listeners", func(done Done) {
+			mut := sync.Mutex{}
+			hitH1 := false
+			hitH2 := false
+			h1 := test_handler{
+				"handler 1",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+					mut.Lock()
+					defer mut.Unlock()
+					hitH1 = true
+					if hitH1 && hitH2 {
+						close(done)
+					}
+				},
+			}
+			h2 := test_handler{
+				"handler 2",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+					mut.Lock()
+					defer mut.Unlock()
+					hitH2 = true
+					if hitH1 && hitH2 {
+						close(done)
+					}
+				},
+			}
+
+			em.Listen("selector", &h1)
+			em.Listen("selector", &h2)
+			em.Emit("selector", &test_event{"test"})
+		})
+
+		It("should publish an event delivered event", func(done Done) {
+			testEvent := &test_event{"test"}
+			var testSelector apid.EventSelector = "selector"
+
+			dummy := func(event apid.Event) {}
+			em.ListenFunc(testSelector, dummy)
+
+			h := test_handler{
+				"event delivered handler",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					e, ok := event.(apid.EventDeliveryEvent)
+
+					Expect(ok).To(BeTrue())
+					Expect(e.Event).To(Equal(testEvent))
+					Expect(e.Selector).To(Equal(testSelector))
+
+					close(done)
+				},
+			}
+
+			em.Listen(apid.EventDeliveredSelector, &h)
+			em.Emit(testSelector, testEvent)
+		})
+
+		It("should be able to remove a listener", func(done Done) {
+			event1 := &test_event{"test1"}
+			event2 := &test_event{"test2"}
+			event3 := &test_event{"test3"}
+
+			dummy := func(event apid.Event) {}
+			em.ListenFunc("selector", dummy)
+
+			h := test_handler{
+				"handler",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					Expect(event).NotTo(Equal(event2))
+					if event == event3 {
+						close(done)
+					}
+				},
+			}
+			em.Listen("selector", &h)
+
+			// need to drive test like this because of async delivery
+			var td apid.EventHandler
+			td = &test_handler{
+				"test driver",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					e := event.(apid.EventDeliveryEvent)
+					if e.Event == event1 {
+						em.StopListening("selector", &h)
+						em.Emit("selector", event2)
+					} else if e.Event == event2 {
+						em.StopListening(apid.EventDeliveredSelector, td)
+						em.Listen("selector", &h)
+						em.Emit("selector", event3)
+					}
+				},
+			}
+			em.Listen(apid.EventDeliveredSelector, td)
+
+			em.Emit("selector", event1)
+		})
+
+		It("should deliver events according selector", func(done Done) {
+			e1 := &test_event{"test1"}
+			e2 := &test_event{"test2"}
+
+			count := int32(0)
+
+			h1 := test_handler{
+				"handler1",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					c := atomic.AddInt32(&count, 1)
+					Expect(event).Should(Equal(e1))
+					if c == 2 {
+						close(done)
+					}
+				},
+			}
+
+			h2 := test_handler{
+				"handler2",
+				func(event apid.Event) {
+					defer GinkgoRecover()
+
+					c := atomic.AddInt32(&count, 1)
+					Expect(event).Should(Equal(e2))
+					if c == 2 {
+						close(done)
+					}
+				},
+			}
+
+			em.Listen("selector1", &h1)
+			em.Listen("selector2", &h2)
+
+			em.Emit("selector2", e2)
+			em.Emit("selector1", e1)
+		})
+	})
+
+	Context("plugins", func() {
+
+		BeforeEach(func() {
+		})
+
+		AfterEach(func() {
+			apid.Events().Close()
+		})
+
+		It("should publish PluginsInitialized event", func(done Done) {
+			xData := make(map[string]interface{})
+			xData["schemaVersion"] = "1.2.3"
+			p := func(s apid.Services) (pd apid.PluginData, err error) {
+				pd = apid.PluginData{
+					Name:      "test plugin",
+					Version:   "1.0.0",
+					ExtraData: xData,
+				}
+				return
+			}
+			apid.RegisterPlugin(p)
+
+			h := func(event apid.Event) {
+				defer GinkgoRecover()
+
+				if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+
+					Expect(len(pie.Plugins)).Should(Equal(1))
+					p := pie.Plugins[0]
+					Expect(p.Name).To(Equal("test plugin"))
+					Expect(p.Version).To(Equal("1.0.0"))
+					Expect(p.ExtraData["schemaVersion"]).To(Equal("1.2.3"))
+
+					close(done)
+				}
+			}
+			apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+
+			apid.InitializePlugins("")
+		})
+
+		It("shutdown event should be emitted and listened successfully", func(done Done) {
+			h := func(event apid.Event) {
+				defer GinkgoRecover()
+
 				if pie, ok := event.(apid.ShutdownEvent); ok {
 					Expect(pie.Description).Should(Equal("apid is going to shutdown"))
-					atomic.AddInt32(&count, 1)
 				} else {
 					Fail("Received wrong event")
 				}
 			}
 			apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
-		}
+			<- apid.Events().Emit(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"})
+			close(done)
+		})
 
+		It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
+			pluginNum := 10
+			count := int32(0)
+			countP := &count
 
-		apid.InitializePlugins("")
-
-		apid.ShutdownPluginsAndWait()
-
-		defer GinkgoRecover()
-		apid.Events().Close()
-
-		// handlers of all registered plugins have executed
-		Expect(count).Should(Equal(int32(pluginNum)))
-
-		close(done)
-	})
-
-	It("should be able to read apid version from PluginsInitialized event", func(done Done) {
-		xData := make(map[string]interface{})
-		xData["schemaVersion"] = "1.2.3"
-		p := func(s apid.Services) (pd apid.PluginData, err error) {
-			pd = apid.PluginData{
-				Name:      "test plugin",
-				Version:   "1.0.0",
-				ExtraData: xData,
+			// create and register plugins, listen to shutdown event
+			for i:=0; i<pluginNum; i++ {
+				apid.RegisterPlugin(createDummyPlugin(i))
+				h := func(event apid.Event) {
+					if pie, ok := event.(apid.ShutdownEvent); ok {
+						Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+						atomic.AddInt32(countP, 1)
+					} else {
+						Fail("Received wrong event")
+					}
+				}
+				apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
 			}
-			return
-		}
-		apid.RegisterPlugin(p)
 
-		apidVersion := "dummy_version"
 
-		h := func(event apid.Event) {
-			defer GinkgoRecover()
+			apid.InitializePlugins("")
 
-			if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+			apid.ShutdownPluginsAndWait()
 
-				apid.Events().Close()
-				Expect(pie.ApidVersion).To(Equal(apidVersion))
-				close(done)
+			// handlers of all registered plugins have executed
+			Expect(count).Should(Equal(int32(pluginNum)))
+			close(done)
+		})
+
+		It("should be able to read apid version from PluginsInitialized event", func(done Done) {
+			xData := make(map[string]interface{})
+			xData["schemaVersion"] = "1.2.3"
+			p := func(s apid.Services) (pd apid.PluginData, err error) {
+				pd = apid.PluginData{
+					Name:      "test plugin",
+					Version:   "1.0.0",
+					ExtraData: xData,
+				}
+				return
 			}
-		}
-		apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+			apid.RegisterPlugin(p)
 
-		apid.InitializePlugins(apidVersion)
+			apidVersion := "dummy_version"
+
+			h := func(event apid.Event) {
+				defer GinkgoRecover()
+
+				if pie, ok := event.(apid.PluginsInitializedEvent); ok {
+					Expect(pie.ApidVersion).To(Equal(apidVersion))
+					close(done)
+				}
+			}
+			apid.Events().ListenFunc(apid.SystemEventsSelector, h)
+
+			apid.InitializePlugins(apidVersion)
+		})
 	})
 })
 
@@ -389,7 +370,6 @@
 		return
 	}
 	return p
-
 }
 
 type test_handler struct {