Merge branch 'master' of github.com:30x/apid-core into XAPID-824
diff --git a/apid.go b/apid.go
index 73f243c..a7429b5 100644
--- a/apid.go
+++ b/apid.go
@@ -1,9 +1,15 @@
package apid
-import "os"
+import (
+ "errors"
+ "os"
+ "time"
+)
const (
- SystemEventsSelector EventSelector = "system event"
+ SystemEventsSelector EventSelector = "system event"
+ ShutdownEventSelector EventSelector = "shutdown event"
+ ShutdownTimeout time.Duration = 10 * time.Second
)
var (
@@ -69,6 +75,24 @@
log.Debugf("done initializing plugins")
}
+// Shutdown all the plugins that have registered for ShutdownEventSelector.
+// This call will block until either all required plugins shutdown, or a timeout occurred.
+func ShutdownPluginsAndWait() error {
+ shutdownEvent := ShutdownEvent{"apid is going to shutdown"}
+ eventChan := Events().Emit(ShutdownEventSelector, shutdownEvent)
+ select {
+ case event := <-eventChan:
+ if e, ok := event.(ShutdownEvent); ok {
+ if e == shutdownEvent {
+ return nil
+ }
+ }
+ return errors.New("Emit() problem: wrong event delivered")
+ case <-time.After(ShutdownTimeout):
+ return errors.New("Shutdown timeout")
+ }
+}
+
func AllServices() Services {
return services
}
@@ -124,3 +148,7 @@
type systemEvent struct {
description string
}
+
+type ShutdownEvent struct {
+ Description string
+}
diff --git a/data_service.go b/data_service.go
index 62523e0..1b84328 100644
--- a/data_service.go
+++ b/data_service.go
@@ -16,7 +16,7 @@
}
type DB interface {
- Ping() (error)
+ Ping() error
Prepare(query string) (*sql.Stmt, error)
Exec(query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
diff --git a/events/event_manager.go b/events/event_manager.go
index 33b0ee1..7d7c69f 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -1,21 +1,28 @@
package events
import (
- "github.com/30x/apid-core"
"sync"
+ "reflect"
+
+ "github.com/30x/apid-core"
)
// events published to a given channel are processed entirely in order, though delivery to listeners is async
type eventManager struct {
+ sync.Mutex
dispatchers map[apid.EventSelector]*dispatcher
}
-func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) {
+func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+
log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
- if !em.dispatchers[selector].Send(event) {
- em.sendDelivered(selector, event, 0) // in case of no dispatcher
- }
+
+ responseChannel := make(chan apid.Event, 1)
+ em.EmitWithCallback(selector, event, func(event apid.Event) {
+ responseChannel <- event
+ })
+ return responseChannel
}
func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) {
@@ -24,7 +31,7 @@
handler := &funcWrapper{em, nil}
handler.HandlerFunc = func(e apid.Event) {
if ede, ok := e.(apid.EventDeliveryEvent); ok {
- if ede.Event == event {
+ if reflect.DeepEqual(ede.Event, event) {
em.StopListening(apid.EventDeliveredSelector, handler)
callback(e)
}
@@ -32,20 +39,31 @@
}
em.Listen(apid.EventDeliveredSelector, handler)
- em.Emit(selector, event)
+
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+
+ if !dispatch.Send(event) {
+ em.sendDelivered(selector, event, 0) // in case of no dispatcher
+ }
}
func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
- return em.dispatchers[selector].HasHandlers()
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+ return dispatch.HasHandlers()
}
func (em *eventManager) Listen(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("listen: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
em.dispatchers = make(map[apid.EventSelector]*dispatcher)
}
- list := em.dispatchers[selector]
- if list == nil {
+ if em.dispatchers[selector] == nil {
d := &dispatcher{sync.Mutex{}, em, selector, nil, nil}
em.dispatchers[selector] = d
}
@@ -53,6 +71,8 @@
}
func (em *eventManager) StopListening(selector apid.EventSelector, handler apid.EventHandler) {
+ em.Lock()
+ defer em.Unlock()
log.Debugf("stop listening: '%s' handler: %v", selector, handler)
if em.dispatchers == nil {
return
@@ -77,9 +97,11 @@
}
func (em *eventManager) Close() {
- log.Debugf("Closing %d dispatchers", len(em.dispatchers))
+ em.Lock()
dispatchers := em.dispatchers
em.dispatchers = nil
+ em.Unlock()
+ log.Debugf("Closing %d dispatchers", len(dispatchers))
for _, dispatcher := range dispatchers {
dispatcher.Close()
}
@@ -145,6 +167,8 @@
}
func (d *dispatcher) HasHandlers() bool {
+ d.Lock()
+ defer d.Unlock()
return d != nil && len(d.handlers) > 0
}
@@ -154,10 +178,13 @@
select {
case event := <-d.channel:
if event != nil {
- log.Debugf("delivering %v to %v", &event, d.handlers)
- if len(d.handlers) > 0 {
+ d.Lock()
+ handlers := d.handlers
+ d.Unlock()
+ log.Debugf("delivering %v to %v", &event, handlers)
+ if len(handlers) > 0 {
var wg sync.WaitGroup
- for _, h := range d.handlers {
+ for _, h := range handlers {
handler := h
wg.Add(1)
go func() {
@@ -168,7 +195,7 @@
log.Debugf("waiting for handlers")
wg.Wait()
}
- d.em.sendDelivered(d.selector, event, len(d.handlers))
+ d.em.sendDelivered(d.selector, event, len(handlers))
log.Debugf("event %v delivered", &event)
}
}
diff --git a/events/events_test.go b/events/events_test.go
index 70bc19a..8c338b6 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -1,11 +1,16 @@
package events_test
import (
+ "sync"
+ "sync/atomic"
+
"github.com/30x/apid-core"
"github.com/30x/apid-core/events"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "sync/atomic"
+
+ "fmt"
+ "strconv"
)
var _ = Describe("Events Service", func() {
@@ -105,13 +110,15 @@
em := events.CreateService()
defer em.Close()
+ mut := sync.Mutex{}
hitH1 := false
hitH2 := false
h1 := test_handler{
"handler 1",
func(event apid.Event) {
defer GinkgoRecover()
-
+ mut.Lock()
+ defer mut.Unlock()
hitH1 = true
if hitH1 && hitH2 {
em.Close()
@@ -124,6 +131,8 @@
func(event apid.Event) {
defer GinkgoRecover()
+ mut.Lock()
+ defer mut.Unlock()
hitH2 = true
if hitH1 && hitH2 {
em.Close()
@@ -258,8 +267,8 @@
xData["schemaVersion"] = "1.2.3"
p := func(s apid.Services) (pd apid.PluginData, err error) {
pd = apid.PluginData{
- Name: "test plugin",
- Version: "1.0.0",
+ Name: "test plugin",
+ Version: "1.0.0",
ExtraData: xData,
}
return
@@ -287,8 +296,72 @@
apid.InitializePlugins()
})
+ It("shutdown event should be emitted and listened successfully", func(done Done) {
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+
+ if pie, ok := event.(apid.ShutdownEvent); ok {
+ apid.Events().Close()
+ Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+ fmt.Println(pie.Description)
+ close(done)
+ } else {
+ Fail("Received wrong event")
+ }
+ }
+ apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+ shutdownHandler := func(event apid.Event) {}
+ apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler)
+ })
+
+ It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
+ pluginNum := 10
+ count := int32(0)
+
+ // create and register plugins, listen to shutdown event
+ for i:=0; i<pluginNum; i++ {
+ apid.RegisterPlugin(createDummyPlugin(i))
+ h := func(event apid.Event) {
+ if pie, ok := event.(apid.ShutdownEvent); ok {
+ Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+ atomic.AddInt32(&count, 1)
+ } else {
+ Fail("Received wrong event")
+ }
+ }
+ apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+ }
+
+
+ apid.InitializePlugins()
+
+ apid.ShutdownPluginsAndWait()
+
+ defer GinkgoRecover()
+ apid.Events().Close()
+
+ // handlers of all registered plugins have executed
+ Expect(count).Should(Equal(int32(pluginNum)))
+
+ close(done)
+ })
})
+func createDummyPlugin(id int) apid.PluginInitFunc{
+ xData := make(map[string]interface{})
+ xData["schemaVersion"] = "1.2.3"
+ p := func(s apid.Services) (pd apid.PluginData, err error) {
+ pd = apid.PluginData{
+ Name: "test plugin " + strconv.Itoa(id),
+ Version: "1.0.0",
+ ExtraData: xData,
+ }
+ return
+ }
+ return p
+
+}
+
type test_handler struct {
description string
f func(event apid.Event)
diff --git a/events_service.go b/events_service.go
index 0e249c9..c11b358 100644
--- a/events_service.go
+++ b/events_service.go
@@ -11,8 +11,10 @@
type EventHandlerFunc func(event Event)
type EventsService interface {
- // publish an event to the selector
- Emit(selector EventSelector, event Event)
+ // Publish an event to the selector.
+ // It will send a copy of the delivered event to the returned channel, after all listeners have responded to the event.
+ // Call "Emit()" for non-blocking, "<-Emit()" for blocking.
+ Emit(selector EventSelector, event Event) chan Event
// publish an event to the selector, call the passed handler when all listeners have responded to the event
EmitWithCallback(selector EventSelector, event Event, handler EventHandlerFunc)
@@ -42,13 +44,15 @@
Count int
}
+// use reflect.DeepEqual to compare this type
type PluginsInitializedEvent struct {
Description string
+ // using slice member will make the type "PluginsInitializedEvent" uncomparable
Plugins []PluginData
}
type PluginData struct {
- Name string
- Version string
+ Name string
+ Version string
ExtraData map[string]interface{}
}