(1) Ensure that apid waits till all the plugins respond back before
proceeding to look for next set of changes.
(2) Ensure the Sequence is persisted in DB, once all plugins have acked
for a change. And reuse this if apid is restarted.
diff --git a/apigee_sync.go b/apigee_sync.go
index 1c18cea..8184353 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -4,6 +4,7 @@
"bytes"
"encoding/json"
"errors"
+ "github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
"io/ioutil"
"net/http"
@@ -16,9 +17,33 @@
var latestSequence int64
var token string
-var tokenActive, downloadSnapshot, downloadBootSnapshot, gotSequence bool
+var tokenActive, downloadDataSnapshot, downloadBootSnapshot, chfin bool
var lastSequence string
-var snapshotInfo string
+var gsnapshotInfo string
+
+func donehandler(e apid.Event) {
+ if rsp, ok := e.(apid.EventDeliveryEvent); ok {
+ if rsp.Description == "event complete" {
+ if ev, ok := rsp.Event.(*common.Snapshot); ok {
+ if downloadBootSnapshot == false {
+ downloadBootSnapshot = true
+ log.Debug("Updated bootstrap SnapshotInfo")
+ } else {
+ gsnapshotInfo = ev.SnapshotInfo
+ downloadDataSnapshot = true
+ log.Debug("Updated data SnapshotInfo")
+ }
+ } else if ev, ok := rsp.Event.(*common.ChangeList); ok {
+ lastSequence = ev.LastSequence
+ status := persistChange(lastSequence)
+ if status == false {
+ log.Fatal("Unable to update Sequence in DB")
+ }
+ chfin = true
+ }
+ }
+ }
+}
/*
* Helper function that sleeps for N seconds, if comm. with change agent
@@ -57,7 +82,7 @@
*/
func pollChangeAgent() error {
- if downloadSnapshot != true {
+ if downloadDataSnapshot != true {
log.Warning("Waiting for snapshot download to complete")
return errors.New("Snapshot download in progress...")
}
@@ -68,8 +93,11 @@
}
changesUri.Path = path.Join(changesUri.Path, "/changes")
- configId := config.GetString(configScopeId)
-
+ /*
+ * Check to see if we have lastSequence already saved in the DB,
+ * in which case, it has to be used to prevent re-reading same data
+ */
+ lastSequence = findLastSeqInfo(gapidConfigId)
for {
log.Debug("polling...")
if tokenActive == false {
@@ -81,11 +109,11 @@
}
/* Find the scopes associated with the config id */
- scopes := findScopesforId(configId)
+ scopes := findScopesforId(gapidConfigId)
v := url.Values{}
/* Sequence added to the query if available */
- if gotSequence == true {
+ if lastSequence != "" {
v.Add("since", lastSequence)
}
v.Add("block", "45")
@@ -98,8 +126,8 @@
for _, scope := range scopes {
v.Add("scope", scope)
}
- v.Add("scope", configId)
- v.Add("snapshot", snapshotInfo)
+ v.Add("scope", gapidConfigId)
+ v.Add("snapshot", gsnapshotInfo)
changesUri.RawQuery = v.Encode()
uri := changesUri.String()
log.Info("Fetching changes: ", uri)
@@ -136,12 +164,29 @@
/* If valid data present, Emit to plugins */
if len(resp.Changes) > 0 {
+ chfin = false
+ events.ListenFunc(apid.EventDeliveredSelector, donehandler)
events.Emit(ApigeeSyncEventSelector, &resp)
+ /*
+ * The plugins should have finished what they are doing.
+ * Wait till they are done.
+ * If they take longer than expected - abort apid(?)
+ * (Should there be a configurable Fudge factor?) FIXME
+ */
+ for count := 0; count < 1000; count++ {
+ if chfin == false {
+ log.Info("Waiting for plugins to complete...")
+ time.Sleep(time.Duration(count) * 100 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+ if chfin == false {
+ log.Fatal("Never got ack from plugins. Investigate..")
+ }
} else {
log.Info("No Changes detected for Scopes ", scopes)
}
- lastSequence = resp.LastSequence
- gotSequence = true
}
}
@@ -212,11 +257,12 @@
func Redirect(req *http.Request, via []*http.Request) error {
req.Header.Add("Authorization", "Bearer "+token)
- req.Header.Add("org", config.GetString(configScopeId))
+ req.Header.Add("org", gapidConfigId)
return nil
}
-/* Method downloads the snapshot in a two phased manner.
+/*
+ * Method downloads the snapshot in a two phased manner.
* Phase 1: Use the apidConfigId as the bootstrap scope, and
* get the apid_config and apid_config_scope from the snapshot
* server.
@@ -227,17 +273,49 @@
* If there is already previous data in sqlite, donot fetch
* again from snapshot server.
*/
-func DownloadSnapshot() error {
+func DownloadSnapshots() {
- var scopes []string
- snapshotInfo = findSnapshotInfo(config.GetString(configScopeId))
- if snapshotInfo != "" {
- log.Debug("Proceeding with local Sqlite data")
- downloadSnapshot = true
- return nil
+ /*
+ * Skip Downloading snapshot, if there is already a snapshot
+ * available from previous run of APID
+ */
+ gsnapshotInfo = findSnapshotInfo(gapidConfigId)
+ if gsnapshotInfo != "" {
+ downloadDataSnapshot = true
+ downloadBootSnapshot = true
+ return
}
-PHASE_2:
+ /* Phase 1 */
+ DownloadSnapshot()
+
+ /*
+ * Give some time for all the plugins to process the Downloaded
+ * Snapshot
+ */
+ for count := 0; count < 60; count++ {
+ if downloadBootSnapshot == false {
+ log.Debug("Waiting for bootscope snapshot download...")
+ time.Sleep(time.Duration(count) * 100 * time.Millisecond)
+ } else {
+ break
+ }
+ }
+
+ /* Phase 2 */
+ if downloadBootSnapshot == true && downloadDataSnapshot == true {
+ log.Debug("Proceeding with existing Sqlite data")
+ } else if downloadBootSnapshot == true {
+ log.Debug("Proceed to download Snapshot for data scopes")
+ DownloadSnapshot()
+ } else {
+ log.Fatal("Snapshot for bootscope failed")
+ }
+}
+
+func DownloadSnapshot() {
+
+ var scopes []string
/* Get the bearer token */
status := getBearerToken()
@@ -250,26 +328,12 @@
}
if downloadBootSnapshot == false {
- scopes = append(scopes, (config.GetString(configScopeId)))
+ scopes = append(scopes, (gapidConfigId))
} else {
-
- /*
- * With WAL there is a chance, commits happening might take
- * a bit, give some time if another session is in Committing
- * the scopes
- */
- for count := 0; count < 60; count++ {
- scopes = findScopesforId(config.GetString(configScopeId))
- if scopes == nil {
- log.Info("User Scopes not found in DB, retry in a bit.")
- time.Sleep(time.Duration(count) * 100 * time.Millisecond)
- } else {
- break
- }
- }
- if scopes == nil {
- log.Fatal("Scope cannot be found to download snapshot")
- }
+ scopes = findScopesforId(gapidConfigId)
+ }
+ if scopes == nil {
+ log.Fatal("Scope cannot be found to download snapshot")
}
/* Frame and send the snapshot request */
snapshotUri.Path = path.Join(snapshotUri.Path, "/snapshots")
@@ -307,38 +371,29 @@
err = json.NewDecoder(r.Body).Decode(&resp)
if err != nil {
- /*
- *If the data set is empty, allow it to proceed, as changeserver
- * will feed data. Since Bootstrapping has passed, it has the
- * Bootstrap config id to function.
- */
if downloadBootSnapshot == false {
log.Fatal("JSON Response Data not parsable: ", err)
} else {
- downloadSnapshot = true
- return nil
+
+ /*
+ * If the data set is empty, allow it to proceed, as changeserver
+ * will feed data. Since Bootstrapping has passed, it has the
+ * Bootstrap config id to function.
+ */
+ downloadDataSnapshot = true
+ return
}
}
- /*
- * The idea here is that you download snapshot for the scopes
- * associated with the apidconfig Id, and then download the
- * data based on the scopes retrieved in the first phase
- */
+
if r.StatusCode == 200 {
log.Info("Emit Snapshot response to plugins")
+ events.ListenFunc(apid.EventDeliveredSelector, donehandler)
events.Emit(ApigeeSyncEventSelector, &resp)
- snapshotInfo = resp.SnapshotInfo
- if downloadBootSnapshot == false {
- downloadBootSnapshot = true
- goto PHASE_2
- } else if downloadBootSnapshot == true {
- downloadSnapshot = true
- }
+
} else {
log.Fatalf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
}
- return err
}
/*
@@ -370,7 +425,30 @@
/*
* Retrieve SnapshotInfo for the given apidConfigId from apid_config table
*/
-func findSnapshotInfo(configId string) (snapshotInfo string) {
+func findLastSeqInfo(configId string) (info string) {
+
+ db, err := data.DB()
+ if err != nil {
+ log.Errorf("DB open Error: %s", err)
+ return ""
+ }
+
+ rows, err := db.Query("select lastSequence from APID_CONFIG where id = $1", configId)
+ if err != nil {
+ log.Errorf("Failed to query APID_CONFIG. Err: %s", err)
+ return ""
+ }
+ defer rows.Close()
+ for rows.Next() {
+ rows.Scan(&info)
+ }
+ return info
+}
+
+/*
+ * Retrieve LastSequence for the given apidConfigId from apid_config table
+ */
+func findSnapshotInfo(configId string) (info string) {
db, err := data.DB()
if err != nil {
@@ -385,7 +463,43 @@
}
defer rows.Close()
for rows.Next() {
- rows.Scan(&snapshotInfo)
+ rows.Scan(&info)
}
- return snapshotInfo
+ return info
+}
+
+/*
+ * Persist the last change Id each time a change has been successfully
+ * processed by the plugin(s)
+ */
+func persistChange(lastChange string) bool {
+ db, err := data.DB()
+ if err != nil {
+ log.Errorf("DB open Error: %s", err)
+ return false
+ }
+ txn, err := db.Begin()
+ if err != nil {
+ log.Error("Unable to create Sqlite transaction")
+ return false
+ }
+ prep, err := txn.Prepare("UPDATE APID_CONFIG SET lastSequence=$1 WHERE id=$2;")
+ if err != nil {
+ log.Error("INSERT APID_CONFIG Failed: ", err)
+ return false
+ }
+ defer prep.Close()
+ s := txn.Stmt(prep)
+ _, err = s.Exec(lastChange, gapidConfigId)
+ s.Close()
+ if err != nil {
+ log.Error("UPDATE APID_CONFIG_SCOPE Failed: ", err)
+ txn.Rollback()
+ return false
+ } else {
+ log.Info("UPDATE APID_CONFIG_SCOPE Success: (", lastChange, ")")
+ txn.Commit()
+ return true
+ }
+
}
diff --git a/init.go b/init.go
index e37ba96..3edc842 100644
--- a/init.go
+++ b/init.go
@@ -19,10 +19,11 @@
)
var (
- log apid.LogService
- config apid.ConfigService
- data apid.DataService
- events apid.EventsService
+ log apid.LogService
+ config apid.ConfigService
+ data apid.DataService
+ events apid.EventsService
+ gapidConfigId string
)
func init() {
@@ -34,7 +35,7 @@
if event == apid.PluginsInitializedEvent {
log.Debug("start post plugin init")
/* call to Download Snapshot info */
- go DownloadSnapshot()
+ go DownloadSnapshots()
/* Begin Looking for changes periodically */
log.Debug("starting update goroutine")
@@ -61,7 +62,7 @@
events.ListenFunc(apid.SystemEventsSelector, postInitPlugins)
config.SetDefault(configPollInterval, 120)
-
+ gapidConfigId = config.GetString(configScopeId)
db, err := data.DB()
if err != nil {
log.Panic("Unable to access DB", err)
diff --git a/listener.go b/listener.go
index c8d9418..83b87dd 100644
--- a/listener.go
+++ b/listener.go
@@ -21,6 +21,7 @@
if err != nil {
panic("Unable to access Sqlite DB")
}
+
txn, err := db.Begin()
if err != nil {
log.Error("Unable to create Sqlite transaction")