Merge branch 'master' of github.com:30x/apid-core into XAPID-824
diff --git a/apid.go b/apid.go index 73f243c..a7429b5 100644 --- a/apid.go +++ b/apid.go
@@ -1,9 +1,15 @@ package apid -import "os" +import ( + "errors" + "os" + "time" +) const ( - SystemEventsSelector EventSelector = "system event" + SystemEventsSelector EventSelector = "system event" + ShutdownEventSelector EventSelector = "shutdown event" + ShutdownTimeout time.Duration = 10 * time.Second ) var ( @@ -69,6 +75,24 @@ log.Debugf("done initializing plugins") } +// Shutdown all the plugins that have registered for ShutdownEventSelector. +// This call will block until either all required plugins shutdown, or a timeout occurred. +func ShutdownPluginsAndWait() error { + shutdownEvent := ShutdownEvent{"apid is going to shutdown"} + eventChan := Events().Emit(ShutdownEventSelector, shutdownEvent) + select { + case event := <-eventChan: + if e, ok := event.(ShutdownEvent); ok { + if e == shutdownEvent { + return nil + } + } + return errors.New("Emit() problem: wrong event delivered") + case <-time.After(ShutdownTimeout): + return errors.New("Shutdown timeout") + } +} + func AllServices() Services { return services } @@ -124,3 +148,7 @@ type systemEvent struct { description string } + +type ShutdownEvent struct { + Description string +}
diff --git a/data_service.go b/data_service.go index 62523e0..1b84328 100644 --- a/data_service.go +++ b/data_service.go
@@ -16,7 +16,7 @@ } type DB interface { - Ping() (error) + Ping() error Prepare(query string) (*sql.Stmt, error) Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...interface{}) (*sql.Rows, error)
diff --git a/events/event_manager.go b/events/event_manager.go index 33b0ee1..7d7c69f 100644 --- a/events/event_manager.go +++ b/events/event_manager.go
@@ -1,21 +1,28 @@ package events import ( - "github.com/30x/apid-core" "sync" + "reflect" + + "github.com/30x/apid-core" ) // events published to a given channel are processed entirely in order, though delivery to listeners is async type eventManager struct { + sync.Mutex dispatchers map[apid.EventSelector]*dispatcher } -func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) { +func (em *eventManager) Emit(selector apid.EventSelector, event apid.Event) chan apid.Event { + log.Debugf("emit selector: '%s' event %v: %v", selector, &event, event) - if !em.dispatchers[selector].Send(event) { - em.sendDelivered(selector, event, 0) // in case of no dispatcher - } + + responseChannel := make(chan apid.Event, 1) + em.EmitWithCallback(selector, event, func(event apid.Event) { + responseChannel <- event + }) + return responseChannel } func (em *eventManager) EmitWithCallback(selector apid.EventSelector, event apid.Event, callback apid.EventHandlerFunc) { @@ -24,7 +31,7 @@ handler := &funcWrapper{em, nil} handler.HandlerFunc = func(e apid.Event) { if ede, ok := e.(apid.EventDeliveryEvent); ok { - if ede.Event == event { + if reflect.DeepEqual(ede.Event, event) { em.StopListening(apid.EventDeliveredSelector, handler) callback(e) } @@ -32,20 +39,31 @@ } em.Listen(apid.EventDeliveredSelector, handler) - em.Emit(selector, event) + + em.Lock() + dispatch := em.dispatchers[selector] + em.Unlock() + + if !dispatch.Send(event) { + em.sendDelivered(selector, event, 0) // in case of no dispatcher + } } func (em *eventManager) HasListeners(selector apid.EventSelector) bool { - return em.dispatchers[selector].HasHandlers() + em.Lock() + dispatch := em.dispatchers[selector] + em.Unlock() + return dispatch.HasHandlers() } func (em *eventManager) Listen(selector apid.EventSelector, handler apid.EventHandler) { + em.Lock() + defer em.Unlock() log.Debugf("listen: '%s' handler: %v", selector, handler) if em.dispatchers == nil { em.dispatchers = make(map[apid.EventSelector]*dispatcher) } - list := em.dispatchers[selector] - if list == nil { + if em.dispatchers[selector] == nil { d := &dispatcher{sync.Mutex{}, em, selector, nil, nil} em.dispatchers[selector] = d } @@ -53,6 +71,8 @@ } func (em *eventManager) StopListening(selector apid.EventSelector, handler apid.EventHandler) { + em.Lock() + defer em.Unlock() log.Debugf("stop listening: '%s' handler: %v", selector, handler) if em.dispatchers == nil { return @@ -77,9 +97,11 @@ } func (em *eventManager) Close() { - log.Debugf("Closing %d dispatchers", len(em.dispatchers)) + em.Lock() dispatchers := em.dispatchers em.dispatchers = nil + em.Unlock() + log.Debugf("Closing %d dispatchers", len(dispatchers)) for _, dispatcher := range dispatchers { dispatcher.Close() } @@ -145,6 +167,8 @@ } func (d *dispatcher) HasHandlers() bool { + d.Lock() + defer d.Unlock() return d != nil && len(d.handlers) > 0 } @@ -154,10 +178,13 @@ select { case event := <-d.channel: if event != nil { - log.Debugf("delivering %v to %v", &event, d.handlers) - if len(d.handlers) > 0 { + d.Lock() + handlers := d.handlers + d.Unlock() + log.Debugf("delivering %v to %v", &event, handlers) + if len(handlers) > 0 { var wg sync.WaitGroup - for _, h := range d.handlers { + for _, h := range handlers { handler := h wg.Add(1) go func() { @@ -168,7 +195,7 @@ log.Debugf("waiting for handlers") wg.Wait() } - d.em.sendDelivered(d.selector, event, len(d.handlers)) + d.em.sendDelivered(d.selector, event, len(handlers)) log.Debugf("event %v delivered", &event) } }
diff --git a/events/events_test.go b/events/events_test.go index 70bc19a..8c338b6 100644 --- a/events/events_test.go +++ b/events/events_test.go
@@ -1,11 +1,16 @@ package events_test import ( + "sync" + "sync/atomic" + "github.com/30x/apid-core" "github.com/30x/apid-core/events" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "sync/atomic" + + "fmt" + "strconv" ) var _ = Describe("Events Service", func() { @@ -105,13 +110,15 @@ em := events.CreateService() defer em.Close() + mut := sync.Mutex{} hitH1 := false hitH2 := false h1 := test_handler{ "handler 1", func(event apid.Event) { defer GinkgoRecover() - + mut.Lock() + defer mut.Unlock() hitH1 = true if hitH1 && hitH2 { em.Close() @@ -124,6 +131,8 @@ func(event apid.Event) { defer GinkgoRecover() + mut.Lock() + defer mut.Unlock() hitH2 = true if hitH1 && hitH2 { em.Close() @@ -258,8 +267,8 @@ xData["schemaVersion"] = "1.2.3" p := func(s apid.Services) (pd apid.PluginData, err error) { pd = apid.PluginData{ - Name: "test plugin", - Version: "1.0.0", + Name: "test plugin", + Version: "1.0.0", ExtraData: xData, } return @@ -287,8 +296,72 @@ apid.InitializePlugins() }) + It("shutdown event should be emitted and listened successfully", func(done Done) { + h := func(event apid.Event) { + defer GinkgoRecover() + + if pie, ok := event.(apid.ShutdownEvent); ok { + apid.Events().Close() + Expect(pie.Description).Should(Equal("apid is going to shutdown")) + fmt.Println(pie.Description) + close(done) + } else { + Fail("Received wrong event") + } + } + apid.Events().ListenFunc(apid.ShutdownEventSelector, h) + shutdownHandler := func(event apid.Event) {} + apid.Events().EmitWithCallback(apid.ShutdownEventSelector, apid.ShutdownEvent{"apid is going to shutdown"}, shutdownHandler) + }) + + It("handlers registered by plugins should execute before apid shutdown", func(done Done) { + pluginNum := 10 + count := int32(0) + + // create and register plugins, listen to shutdown event + for i:=0; i<pluginNum; i++ { + apid.RegisterPlugin(createDummyPlugin(i)) + h := func(event apid.Event) { + if pie, ok := event.(apid.ShutdownEvent); ok { + Expect(pie.Description).Should(Equal("apid is going to shutdown")) + atomic.AddInt32(&count, 1) + } else { + Fail("Received wrong event") + } + } + apid.Events().ListenFunc(apid.ShutdownEventSelector, h) + } + + + apid.InitializePlugins() + + apid.ShutdownPluginsAndWait() + + defer GinkgoRecover() + apid.Events().Close() + + // handlers of all registered plugins have executed + Expect(count).Should(Equal(int32(pluginNum))) + + close(done) + }) }) +func createDummyPlugin(id int) apid.PluginInitFunc{ + xData := make(map[string]interface{}) + xData["schemaVersion"] = "1.2.3" + p := func(s apid.Services) (pd apid.PluginData, err error) { + pd = apid.PluginData{ + Name: "test plugin " + strconv.Itoa(id), + Version: "1.0.0", + ExtraData: xData, + } + return + } + return p + +} + type test_handler struct { description string f func(event apid.Event)
diff --git a/events_service.go b/events_service.go index 0e249c9..c11b358 100644 --- a/events_service.go +++ b/events_service.go
@@ -11,8 +11,10 @@ type EventHandlerFunc func(event Event) type EventsService interface { - // publish an event to the selector - Emit(selector EventSelector, event Event) + // Publish an event to the selector. + // It will send a copy of the delivered event to the returned channel, after all listeners have responded to the event. + // Call "Emit()" for non-blocking, "<-Emit()" for blocking. + Emit(selector EventSelector, event Event) chan Event // publish an event to the selector, call the passed handler when all listeners have responded to the event EmitWithCallback(selector EventSelector, event Event, handler EventHandlerFunc) @@ -42,13 +44,15 @@ Count int } +// use reflect.DeepEqual to compare this type type PluginsInitializedEvent struct { Description string + // using slice member will make the type "PluginsInitializedEvent" uncomparable Plugins []PluginData } type PluginData struct { - Name string - Version string + Name string + Version string ExtraData map[string]interface{} }