Merge pull request #2 from 30x/XAPID-818
added shutdownevent for graceful shutdown
diff --git a/apid.go b/apid.go
index 73f243c..a7429b5 100644
--- a/apid.go
+++ b/apid.go
@@ -1,9 +1,15 @@
package apid
-import "os"
+import (
+ "errors"
+ "os"
+ "time"
+)
const (
- SystemEventsSelector EventSelector = "system event"
+ SystemEventsSelector EventSelector = "system event"
+ ShutdownEventSelector EventSelector = "shutdown event"
+ ShutdownTimeout time.Duration = 10 * time.Second
)
var (
@@ -69,6 +75,24 @@
log.Debugf("done initializing plugins")
}
+// Shutdown all the plugins that have registered for ShutdownEventSelector.
+// This call will block until either all required plugins shutdown, or a timeout occurred.
+func ShutdownPluginsAndWait() error {
+ shutdownEvent := ShutdownEvent{"apid is going to shutdown"}
+ eventChan := Events().Emit(ShutdownEventSelector, shutdownEvent)
+ select {
+ case event := <-eventChan:
+ if e, ok := event.(ShutdownEvent); ok {
+ if e == shutdownEvent {
+ return nil
+ }
+ }
+ return errors.New("Emit() problem: wrong event delivered")
+ case <-time.After(ShutdownTimeout):
+ return errors.New("Shutdown timeout")
+ }
+}
+
func AllServices() Services {
return services
}
@@ -124,3 +148,7 @@
type systemEvent struct {
description string
}
+
+type ShutdownEvent struct {
+ Description string
+}
diff --git a/data_service.go b/data_service.go
index 62523e0..1b84328 100644
--- a/data_service.go
+++ b/data_service.go
@@ -16,7 +16,7 @@
}
type DB interface {
- Ping() (error)
+ Ping() error
Prepare(query string) (*sql.Stmt, error)
Exec(query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
diff --git a/events/event_manager.go b/events/event_manager.go
index 527060c..7d7c69f 100644
--- a/events/event_manager.go
+++ b/events/event_manager.go
@@ -2,6 +2,7 @@
import (
"sync"
+ "reflect"
"github.com/30x/apid-core"
)
@@ -13,14 +14,15 @@
dispatchers map[apid.EventSelector]*dispatcher
}
-func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) {
+func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event {
+
log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event)
- em.Lock()
- dispatch := em.dispatchers[selector]
- em.Unlock()
- if !dispatch.Send(event) {
- em.sendDelivered(selector, event, 0) // in case of no dispatcher
- }
+
+ responseChannel := make(chan apid.Event, 1)
+ em.EmitWithCallback(selector, event, func(event apid.Event) {
+ responseChannel <- event
+ })
+ return responseChannel
}
func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) {
@@ -29,7 +31,7 @@
handler := &funcWrapper{em, nil}
handler.HandlerFunc = func(e apid.Event) {
if ede, ok := e.(apid.EventDeliveryEvent); ok {
- if ede.Event == event {
+ if reflect.DeepEqual(ede.Event, event) {
em.StopListening(apid.EventDeliveredSelector, handler)
callback(e)
}
@@ -37,7 +39,14 @@
}
em.Listen(apid.EventDeliveredSelector, handler)
- em.Emit(selector, event)
+
+ em.Lock()
+ dispatch := em.dispatchers[selector]
+ em.Unlock()
+
+ if !dispatch.Send(event) {
+ em.sendDelivered(selector, event, 0) // in case of no dispatcher
+ }
}
func (em *eventManager) HasListeners(selector apid.EventSelector) bool {
diff --git a/events/events_test.go b/events/events_test.go
index ae3f88c..8c338b6 100644
--- a/events/events_test.go
+++ b/events/events_test.go
@@ -8,6 +8,9 @@
"github.com/30x/apid-core/events"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+
+ "fmt"
+ "strconv"
)
var _ = Describe("Events Service", func() {
@@ -293,8 +296,72 @@
apid.InitializePlugins()
})
+ It("shutdown event should be emitted and listened successfully", func(done Done) {
+ h := func(event apid.Event) {
+ defer GinkgoRecover()
+
+ if pie, ok := event.(apid.ShutdownEvent); ok {
+ apid.Events().Close()
+ Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+ fmt.Println(pie.Description)
+ close(done)
+ } else {
+ Fail("Received wrong event")
+ }
+ }
+ apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+ shutdownHandler := func(event apid.Event) {}
+ apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler)
+ })
+
+ It("handlers registered by plugins should execute before apid shutdown", func(done Done) {
+ pluginNum := 10
+ count := int32(0)
+
+ // create and register plugins, listen to shutdown event
+ for i:=0; i<pluginNum; i++ {
+ apid.RegisterPlugin(createDummyPlugin(i))
+ h := func(event apid.Event) {
+ if pie, ok := event.(apid.ShutdownEvent); ok {
+ Expect(pie.Description).Should(Equal("apid is going to shutdown"))
+ atomic.AddInt32(&count, 1)
+ } else {
+ Fail("Received wrong event")
+ }
+ }
+ apid.Events().ListenFunc(apid.ShutdownEventSelector, h)
+ }
+
+
+ apid.InitializePlugins()
+
+ apid.ShutdownPluginsAndWait()
+
+ defer GinkgoRecover()
+ apid.Events().Close()
+
+ // handlers of all registered plugins have executed
+ Expect(count).Should(Equal(int32(pluginNum)))
+
+ close(done)
+ })
})
+func createDummyPlugin(id int) apid.PluginInitFunc{
+ xData := make(map[string]interface{})
+ xData["schemaVersion"] = "1.2.3"
+ p := func(s apid.Services) (pd apid.PluginData, err error) {
+ pd = apid.PluginData{
+ Name: "test plugin " + strconv.Itoa(id),
+ Version: "1.0.0",
+ ExtraData: xData,
+ }
+ return
+ }
+ return p
+
+}
+
type test_handler struct {
description string
f func(event apid.Event)
diff --git a/events_service.go b/events_service.go
index 0e249c9..c11b358 100644
--- a/events_service.go
+++ b/events_service.go
@@ -11,8 +11,10 @@
type EventHandlerFunc func(event Event)
type EventsService interface {
- // publish an event to the selector
- Emit(selector EventSelector, event Event)
+ // Publish an event to the selector.
+ // It will send a copy of the delivered event to the returned channel, after all listeners have responded to the event.
+ // Call "Emit()" for non-blocking, "<-Emit()" for blocking.
+ Emit(selector EventSelector, event Event) chan Event
// publish an event to the selector, call the passed handler when all listeners have responded to the event
EmitWithCallback(selector EventSelector, event Event, handler EventHandlerFunc)
@@ -42,13 +44,15 @@
Count int
}
+// use reflect.DeepEqual to compare this type
type PluginsInitializedEvent struct {
Description string
+ // using slice member will make the type "PluginsInitializedEvent" uncomparable
Plugins []PluginData
}
type PluginData struct {
- Name string
- Version string
+ Name string
+ Version string
ExtraData map[string]interface{}
}