snapshot db protocol: emit snapshot on existing db start and document protocol in readme, cleanup
diff --git a/README.md b/README.md
index 8532ecb..a274032 100644
--- a/README.md
+++ b/README.md
@@ -1,38 +1,109 @@
# apidApigeeSync
This core plugin for [apid](http://github.com/30x/apid) connects to the Apigee Change Agent and publishes the data
-changes events onto the apid Event service.
+changes events onto the apid Event service. It also coordinates DB initialization for plugins on startup.
### Configuration
-#### apigeesync_poll_interval
+| name | description |
+|------------------------------|--------------------------|
+| apigeesync_poll_interval | int. seconds. default: 5 |
+| apigeesync_organization | string. name. required. |
+| apigeesync_proxy_server_base | string. url. required. |
+| apigeesync_consumer_key | string. required. |
+| apigeesync_consumer_secret | string. required. |
-int. seconds. default: 5
+### Event Generated
-#### apigeesync_organization
+* Selector: "ApigeeSync"
+* Data: [payload.go](payload.go)
-string. name. required.
+### Startup Procedure
-#### apigeesync_proxy_server_base
+#### ApigeeSync
+1. Read DB version (Snapshot.SnapshotInfo) from default DB
+2. If version found, emit Snapshot event (using Snapshot.SnapshotInfo, no data)
+3. Ask server for Snapshot
+4. Each time a Snapshot is received
+ 1. Verify Snapshot.SnapshotInfo is different than current
+ 2. Stop processing change events
+ 3. Remove or clean new DB version if it exists
+ 4. Emit Snapshot event
+ 5. Wait for plugins to finish processing
+ 6. Save Snapshot.SnapshotInfo in default DB
+ 7. Release old DB version
+ 8. Start processing change events
-string. url. required.
+ToDo: ApigeeSync currently only receives a new snapshot during startup, so step #4 only happens once. However, it
+ will eventually receive snapshots over time and the sub-steps should be followed at that time. Plugins
+ depending on ApigeeSync for data should assume that it can happen at any time and follow the heuristic below.
-#### apigeesync_consumer_key
+#### ApigeeSync-dependent plugins
+1. Initialization
+ 1. Until receiving first Snapshot message, ApigeeSync-dependent APIs must either:
+ 1. not register (endpoint will return a 404 by default)
+ 2. return a 503 until DB is initialized
+2. Upon receiving a snapshot notification (this is a HOT DB upgrade)
+ 1. Get DB for version (use Snapshot.SnapshotInfo as version)
+ 2. Create plugin's tables, if needed
+ 3. Insert any snapshot data into plugin's tables
+ 4. Set reference to new DB for all data access
+ 5. If db-dependent services are not exposed yet, expose them
-string. required.
+Example plugin code:
-#### apigeesync_consumer_secret
+ var (
+ log apid.LogService // set in initPlugin
+ data apid.DataService
+ unsafeDB apid.DB
+ dbMux sync.RWMutex
+ )
+
+ func init() {
+ apid.RegisterPlugin(initPlugin)
+ }
+
+ func initPlugin(services apid.Services) error {
+ log = services.Log().ForModule("examplePlugin")
+ log.Debug("start init")
+ data = services.Data()
+ return nil
+ }
+
+ // always use getDB() to safely access current DB
+ func getDB() apid.DB {
+ dbMux.RLock()
+ db := unsafeDB
+ dbMux.RUnlock()
+ return db
+ }
+
+ func setDB(db apid.DB) {
+ dbMux.Lock()
+ if unsafeDB == nil { // init API on DB initialization (optional)
+ go initAPI()
+ }
+ unsafeDB = db
+ dbMux.Unlock()
+ }
-string. required.
+ func processSnapshot(snapshot *common.Snapshot) {
+
+ log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
+
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
-
-### Event
-
-Selector:
-
- apidApigeeSync.ApigeeSyncEventSelector
-
-
-Data:
-
-See: [payload.go](payload.go)
+ // init DB as needed (note: DB may exist, use 'CREATE TABLE IF NOT EXISTS' if not explicitly checking)
+ initDB(db)
+
+ for _, table := range snapshot.Tables {
+ // populate tables from snapshot...
+ }
+
+ // switch to new database
+ setDB(db)
+ log.Debug("Snapshot processed")
+ }
diff --git a/apigee_sync.go b/apigee_sync.go
index 8184353..e02a790 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -13,9 +13,6 @@
"time"
)
-// todo: The following was largely copied from old APID - needs review
-
-var latestSequence int64
var token string
var tokenActive, downloadDataSnapshot, downloadBootSnapshot, chfin bool
var lastSequence string
@@ -270,7 +267,7 @@
* the second call to the snapshot server to get all the data
* associated with the scope(s).
* Emit the data for the necessary plugins to process.
- * If there is already previous data in sqlite, donot fetch
+ * If there is already previous data in sqlite, don't fetch
* again from snapshot server.
*/
func DownloadSnapshots() {
@@ -283,6 +280,21 @@
if gsnapshotInfo != "" {
downloadDataSnapshot = true
downloadBootSnapshot = true
+
+ log.Infof("Starting on downloaded snapshot: %s", gsnapshotInfo)
+
+ // verify DB is accessible
+ _, err := data.DBVersion(gsnapshotInfo)
+ if err != nil {
+ log.Panicf("Database inaccessible: %v", err)
+ }
+
+ // allow plugins to start immediately on existing database
+ snap := &common.Snapshot{
+ SnapshotInfo: gsnapshotInfo,
+ }
+ events.Emit(ApigeeSyncEventSelector, snap)
+
return
}
@@ -294,7 +306,7 @@
* Snapshot
*/
for count := 0; count < 60; count++ {
- if downloadBootSnapshot == false {
+ if !downloadBootSnapshot {
log.Debug("Waiting for bootscope snapshot download...")
time.Sleep(time.Duration(count) * 100 * time.Millisecond)
} else {
@@ -303,7 +315,7 @@
}
/* Phase 2 */
- if downloadBootSnapshot == true && downloadDataSnapshot == true {
+ if downloadBootSnapshot && downloadDataSnapshot {
log.Debug("Proceeding with existing Sqlite data")
} else if downloadBootSnapshot == true {
log.Debug("Proceed to download Snapshot for data scopes")
@@ -423,7 +435,7 @@
}
/*
- * Retrieve SnapshotInfo for the given apidConfigId from apid_config table
+ * Retrieve LastSequence for the given apidConfigId from apid_config table
*/
func findLastSeqInfo(configId string) (info string) {
@@ -446,7 +458,7 @@
}
/*
- * Retrieve LastSequence for the given apidConfigId from apid_config table
+ * Retrieve SnapshotInfo for the given apidConfigId from apid_config table
*/
func findSnapshotInfo(configId string) (info string) {
diff --git a/init.go b/init.go
index 3edc842..65b72ad 100644
--- a/init.go
+++ b/init.go
@@ -57,7 +57,7 @@
/* This callback function will get called, once all the plugins are
* initialized (not just this plugin). This is needed because,
* DownloadSnapshots/Changes etc have to begin to be processed only
- * after all the plugins are intialized
+ * after all the plugins are initialized
*/
events.ListenFunc(apid.SystemEventsSelector, postInitPlugins)
diff --git a/listener.go b/listener.go
index 83b87dd..32a1d94 100644
--- a/listener.go
+++ b/listener.go
@@ -36,7 +36,7 @@
if ok {
res = processChange(changeSet, txn)
} else {
- log.Fatal("Received Invalid event. This shouldn't happen!")
+ log.Fatal("Received invalid event: %v", e)
}
}
if res == true {