Merge branch 'master' into XAPID-818
diff --git a/apid.go b/apid.go
index 73f243c..4392b8f 100644
--- a/apid.go
+++ b/apid.go
@@ -1,9 +1,15 @@
 package apid
 
-import "os"
+import (
+	"errors"
+	"os"
+	"time"
+)
 
 const (
-	SystemEventsSelector EventSelector = "system event"
+	SystemEventsSelector  EventSelector = "system event"
+	ShutdownEventSelector EventSelector = "shutdown event"
+	ShutdownTimeout       int           = 10
 )
 
 var (
@@ -69,6 +75,24 @@
 	log.Debugf("done initializing plugins")
 }
 
+// Shutdown all the plugins that have registered for ShutdownEventSelector.
+// This call will block until either all required plugins shutdown, or a timeout occurred.
+func ShutdownPluginsAndWait() error {
+	shutdownEvent := ShutdownEvent{"apid is going to shutdown"}
+	eventChan := Events().Emit(ShutdownEventSelector, shutdownEvent)
+	select {
+	case event := <-eventChan:
+		if e, ok := event.(ShutdownEvent); ok {
+			if e == shutdownEvent {
+				return nil
+			}
+		}
+		return errors.New("Emit() problem: wrong event delivered")
+	case <-time.After(time.Duration(ShutdownTimeout) * time.Second):
+		return errors.New("Shutdown timeout")
+	}
+}
+
 func AllServices() Services {
 	return services
 }
@@ -124,3 +148,7 @@
 type systemEvent struct {
 	description string
 }
+
+type ShutdownEvent struct {
+	Description string
+}
diff --git a/data_service.go b/data_service.go
index 62523e0..1b84328 100644
--- a/data_service.go
+++ b/data_service.go
@@ -16,7 +16,7 @@
 }
 
 type DB interface {
-	Ping() (error)
+	Ping() error
 	Prepare(query string) (*sql.Stmt, error)
 	Exec(query string, args ...interface{}) (sql.Result, error)
 	Query(query string, args ...interface{}) (*sql.Rows, error)
diff --git a/events/event_manager.go b/events/event_manager.go
index 527060c..b98089e 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -2,6 +2,7 @@
 
 import (
 	"sync"
+	"reflect"
 
 	"github.com/30x/apid-core"
 )
@@ -13,14 +14,15 @@
 	dispatchers map[apid.EventSelector]*dispatcher
 }
 
-func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) {
+func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+
 	log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
-	em.Lock()
-	dispatch := em.dispatchers[selector]
-	em.Unlock()
-	if !dispatch.Send(event) {
-		em.sendDelivered(selector, event, 0) // in case of no dispatcher
-	}
+
+	responseChannel := make(chan apid.Event, 1)
+	em.EmitWithCallback(selector, event, func(event apid.Event) {
+		responseChannel <- event
+	})
+	return responseChannel
 }
 
 func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) {
@@ -29,7 +31,7 @@
 	handler := &funcWrapper{em, nil}
 	handler.HandlerFunc = func(e apid.Event) {
 		if ede, ok := e.(apid.EventDeliveryEvent); ok {
-			if ede.Event == event {
+			if reflect.DeepEqual(ede.Event, event) {
 				em.StopListening(apid.EventDeliveredSelector, handler)
 				callback(e)
 			}
@@ -37,7 +39,9 @@
 	}
 
 	em.Listen(apid.EventDeliveredSelector, handler)
-	em.Emit(selector, event)
+	if !em.dispatchers[selector].Send(event) {
+		em.sendDelivered(selector, event, 0) // in case of no dispatcher
+	}
 }
 
 func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
diff --git a/events/events_test.go b/events/events_test.go
index ae3f88c..e1099c1 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -8,6 +8,10 @@
 	"github.com/30x/apid-core/events"
 	. "github.com/onsi/ginkgo"
 	. "github.com/onsi/gomega"
+
+	"sync/atomic"
+	"fmt"
+	"strconv"
 )
 
 var _ = Describe("Events Service", func() {
@@ -293,8 +297,72 @@
 		apid.InitializePlugins()
 	})
 
+	It("shutdown event should be emitted and listened successfully", func(done Done) {
+		h := func(event apid.Event) {
+			defer GinkgoRecover()
+
+			if pie, ok := event.(apid.ShutdownEvent); ok {
+				apid.Events().Close()
+				Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+				fmt.Println(pie.Description)
+				close(done)
+			} else {
+				Fail("Received wrong event")
+			}
+		}
+		apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+		shutdownHandler := func(event apid.Event) {}
+		apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler)
+	})
+
+	It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
+		pluginNum := 10
+		count := int32(0)
+
+		// create and register plugins, listen to shutdown event
+		for i:=0; i<pluginNum; i++ {
+			apid.RegisterPlugin(createDummyPlugin(i))
+			h := func(event apid.Event) {
+				if pie, ok := event.(apid.ShutdownEvent); ok {
+					Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+					atomic.AddInt32(&count, 1)
+				} else {
+					Fail("Received wrong event")
+				}
+			}
+			apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+		}
+
+
+		apid.InitializePlugins()
+
+		apid.ShutdownPluginsAndWait()
+
+		defer GinkgoRecover()
+		apid.Events().Close()
+
+		// handlers of all registered plugins have executed
+		Expect(count).Should(Equal(int32(pluginNum)))
+
+		close(done)
+	})
 })
 
+func createDummyPlugin(id int) apid.PluginInitFunc{
+	xData := make(map[string]interface{})
+	xData["schemaVersion"] = "1.2.3"
+	p := func(s apid.Services) (pd apid.PluginData, err error) {
+		pd = apid.PluginData{
+			Name: "test plugin " + strconv.Itoa(id),
+			Version: "1.0.0",
+			ExtraData: xData,
+		}
+		return
+	}
+	return p
+
+}
+
 type test_handler struct {
 	description string
 	f           func(event apid.Event)
diff --git a/events_service.go b/events_service.go
index 0e249c9..c11b358 100644
--- a/events_service.go
+++ b/events_service.go
@@ -11,8 +11,10 @@
 type EventHandlerFunc func(event Event)
 
 type EventsService interface {
-	// publish an event to the selector
-	Emit(selector EventSelector, event Event)
+	// Publish an event to the selector.
+	// It will send a copy of the delivered event to the returned channel, after all listeners have responded to the event.
+	// Call "Emit()" for non-blocking, "<-Emit()" for blocking.
+	Emit(selector EventSelector, event Event) chan Event
 
 	// publish an event to the selector, call the passed handler when all listeners have responded to the event
 	EmitWithCallback(selector EventSelector, event Event, handler EventHandlerFunc)
@@ -42,13 +44,15 @@
 	Count       int
 }
 
+// use reflect.DeepEqual to compare this type
 type PluginsInitializedEvent struct {
 	Description string
+	// using slice member will make the type "PluginsInitializedEvent" uncomparable
 	Plugins []PluginData
 }
 
 type PluginData struct {
-	Name string
-	Version string
+	Name      string
+	Version   string
 	ExtraData map[string]interface{}
 }