Merge pull request #29 from 30x/refactor
Refactor
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go
index 876ec9c..9e6ed94 100644
--- a/apigeeSync_suite_test.go
+++ b/apigeeSync_suite_test.go
@@ -1,7 +1,6 @@
package apidApigeeSync
import (
- "github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -12,17 +11,25 @@
"time"
"github.com/30x/apid-core"
+
"github.com/30x/apid-core/factory"
)
var (
- tmpDir string
- testServer *httptest.Server
- testRouter apid.Router
- testMock *MockServer
+ tmpDir string
+ testServer *httptest.Server
+ testRouter apid.Router
+ testMock *MockServer
+ wipeDBAferTest bool
)
-var _ = BeforeSuite(func(done Done) {
+const dummyConfigValue string = "placeholder"
+
+var _ = BeforeSuite(func() {
+ wipeDBAferTest = true
+})
+
+var _ = BeforeEach(func(done Done) {
apid.Initialize(factory.DefaultServicesFactory())
config = apid.Config()
@@ -32,12 +39,9 @@
Expect(err).NotTo(HaveOccurred())
config.Set("local_storage_path", tmpDir)
- testRouter = apid.API().Router()
- testServer = httptest.NewServer(testRouter)
-
- config.Set(configProxyServerBaseURI, testServer.URL)
- config.Set(configSnapServerBaseURI, testServer.URL)
- config.Set(configChangeServerBaseURI, testServer.URL)
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
config.Set(configSnapshotProtocol, "json")
config.Set(configPollInterval, 10*time.Millisecond)
@@ -49,140 +53,27 @@
block = "0"
log = apid.Log()
- // set up mock server
- mockParms := MockParms{
- ReliableAPI: false,
- ClusterID: config.GetString(configApidClusterId),
- TokenKey: config.GetString(configConsumerKey),
- TokenSecret: config.GetString(configConsumerSecret),
- Scope: "ert452",
- Organization: "att",
- Environment: "prod",
- }
- testMock = Mock(mockParms, testRouter)
+ _initPlugin(apid.AllServices())
+ close(done)
+}, 3)
- // This is actually the first test :)
- // Tests that entire bootstrap and set of sync operations work
- var lastSnapshot *common.Snapshot
-
- expectedSnapshotTables := make(map[string]bool)
- expectedSnapshotTables["kms.company"] = true
- expectedSnapshotTables["edgex.apid_cluster"] = true
- expectedSnapshotTables["edgex.data_scope"] = true
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- defer GinkgoRecover()
-
- if s, ok := event.(*common.Snapshot); ok {
-
- //verify that during downloadDataSnapshot, knownTables was correctly populated
- Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue())
-
- /* After this, we will mock changes for tables not present in the initial snapshot
- * until that is changed in the mock server, we have to spoof the known tables
- */
-
- //add apid_cluster and data_scope since those would present if this were a real scenario
- knownTables["kms.app_credential"] = true
- knownTables["kms.app_credential_apiproduct_mapper"] = true
- knownTables["kms.developer"] = true
- knownTables["kms.company_developer"] = true
- knownTables["kms.api_product"] = true
- knownTables["kms.app"] = true
-
- lastSnapshot = s
-
- for _, t := range s.Tables {
- switch t.Name {
-
- case "edgex.apid_cluster":
- Expect(t.Rows).To(HaveLen(1))
- r := t.Rows[0]
- var id string
- r.Get("id", &id)
- Expect(id).To(Equal("bootstrap"))
-
- case "edgex.data_scope":
- Expect(t.Rows).To(HaveLen(2))
- r := t.Rows[1] // get the non-cluster row
-
- var id, clusterID, env, org, scope string
- r.Get("id", &id)
- r.Get("apid_cluster_id", &clusterID)
- r.Get("env", &env)
- r.Get("org", &org)
- r.Get("scope", &scope)
-
- Expect(id).To(Equal("ert452"))
- Expect(scope).To(Equal("ert452"))
- Expect(clusterID).To(Equal("bootstrap"))
- Expect(env).To(Equal("prod"))
- Expect(org).To(Equal("att"))
- }
- }
-
- } else if cl, ok := event.(*common.ChangeList); ok {
-
- // ensure that snapshot switched DB versions
- Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
- expectedDB, err := data.DBVersion(lastSnapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
- Expect(getDB() == expectedDB).Should(BeTrue())
-
- Expect(cl.Changes).To(HaveLen(6))
-
- var tables []string
- for _, c := range cl.Changes {
- tables = append(tables, c.Table)
- Expect(c.NewRow).ToNot(BeNil())
-
- var tenantID string
- c.NewRow.Get("tenant_id", &tenantID)
- Expect(tenantID).To(Equal("ert452"))
- }
-
- Expect(tables).To(ContainElement("kms.app_credential"))
- Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
- Expect(tables).To(ContainElement("kms.developer"))
- Expect(tables).To(ContainElement("kms.company_developer"))
- Expect(tables).To(ContainElement("kms.api_product"))
- Expect(tables).To(ContainElement("kms.app"))
-
- events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) {
- defer GinkgoRecover()
-
- // allow other handler to execute to insert last_sequence
- time.Sleep(50 * time.Millisecond)
- var seq string
- err = getDB().
- QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
- Scan(&seq)
-
- Expect(err).NotTo(HaveOccurred())
- Expect(seq).To(Equal(cl.LastSequence))
-
- close(done)
- })
- }
- })
-
- apid.InitializePlugins()
-})
-
-var _ = BeforeEach(func() {
+var _ = AfterEach(func() {
apid.Events().Close()
lastSequence = ""
- _, err := getDB().Exec("DELETE FROM APID_CLUSTER")
- Expect(err).NotTo(HaveOccurred())
- _, err = getDB().Exec("DELETE FROM DATA_SCOPE")
- Expect(err).NotTo(HaveOccurred())
+ if wipeDBAferTest {
+ _, err := getDB().Exec("DELETE FROM APID_CLUSTER")
+ Expect(err).NotTo(HaveOccurred())
+ _, err = getDB().Exec("DELETE FROM DATA_SCOPE")
+ Expect(err).NotTo(HaveOccurred())
- db, err := data.DB()
- Expect(err).NotTo(HaveOccurred())
- _, err = db.Exec("DELETE FROM APID")
- Expect(err).NotTo(HaveOccurred())
+ db, err := dataService.DB()
+ Expect(err).NotTo(HaveOccurred())
+ _, err = db.Exec("DELETE FROM APID")
+ Expect(err).NotTo(HaveOccurred())
+ }
+ wipeDBAferTest = true
})
var _ = AfterSuite(func() {
diff --git a/apigee_sync.go b/apigee_sync.go
index be3fc8c..d6b3a19 100644
--- a/apigee_sync.go
+++ b/apigee_sync.go
@@ -1,514 +1,99 @@
package apidApigeeSync
import (
- "encoding/json"
- "net/http"
- "net/url"
- "path"
- "time"
-
- "sync/atomic"
-
- "io/ioutil"
-
"github.com/30x/apid-core"
- "github.com/apigee-labs/transicator/common"
+ "net/http"
+ "time"
)
const (
- httpTimeout = time.Minute
- pluginTimeout = time.Minute
- maxBackoffTimeout = time.Minute
+ httpTimeout = time.Minute
+ pluginTimeout = time.Minute
)
-var (
- block string = "45"
- lastSequence string
- polling uint32
- knownTables = make(map[string]bool)
-)
+var knownTables = make(map[string]bool)
/*
- * Polls change agent for changes. In event of errors, uses a doubling
- * backoff from 200ms up to a max delay of the configPollInterval value.
+ * Start from existing snapshot if possible
+ * If an existing snapshot does not exist, use the apid scope to fetch
+ * all data scopes, then get a snapshot for those data scopes
+ *
+ * Then, poll for changes
*/
-func pollForChanges() {
+func bootstrap() {
- if atomic.SwapUint32(&polling, 1) == 1 {
+ if apidInfo.LastSnapshot != "" {
+ snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot)
+
+ events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) {
+ changeManager.pollChangeWithBackoff()
+ })
+
+ log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo)
return
}
- var backOffFunc func()
- pollInterval := config.GetDuration(configPollInterval)
- for {
- start := time.Now()
- err := pollChangeAgent()
- end := time.Now()
- if err != nil {
- if _, ok := err.(changeServerError); ok {
- downloadDataSnapshot()
- continue
- }
- log.Debugf("Error connecting to changeserver: %v", err)
- }
- if end.After(start.Add(time.Second)) {
- backOffFunc = nil
- continue
- }
- if backOffFunc == nil {
- backOffFunc = createBackOff(200*time.Millisecond, pollInterval)
- }
- backOffFunc()
- }
+ downloadBootSnapshot(nil)
+ downloadDataSnapshot(quitPollingSnapshotServer)
- atomic.SwapUint32(&polling, 0)
+ changeManager.pollChangeWithBackoff()
+
}
/*
- * Long polls the change agent with a 45 second block. Parses the response from
- * change agent and raises an event. Called by pollForChanges().
+ * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy
+ * for retrying on errors
*/
-func pollChangeAgent() error {
+func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) {
- changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
- if err != nil {
- log.Errorf("bad url value for config %s: %s", changesUri, err)
- return err
- }
- changesUri.Path = path.Join(changesUri.Path, "changes")
+ backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true)
- /*
- * Check to see if we have lastSequence already saved in the DB,
- * in which case, it has to be used to prevent re-reading same data
- */
- lastSequence = getLastSequence()
+ //inintialize the retry channel to start first attempt immediately
+ retry := time.After(0 * time.Millisecond)
+
for {
- log.Debug("polling...")
+ select {
+ case <-quit:
+ log.Info("Quit signal recieved. Returning")
+ return
+ case <-retry:
+ start := time.Now()
- /* Find the scopes associated with the config id */
- scopes := scopeCache.readAllScope()
- v := url.Values{}
-
- /* Sequence added to the query if available */
- if lastSequence != "" {
- v.Add("since", lastSequence)
- }
- v.Add("block", block)
-
- /*
- * Include all the scopes associated with the config Id
- * The Config Id is included as well, as it acts as the
- * Bootstrap scope
- */
- for _, scope := range scopes {
- v.Add("scope", scope)
- }
- v.Add("scope", apidInfo.ClusterID)
- v.Add("snapshot", apidInfo.LastSnapshot)
- changesUri.RawQuery = v.Encode()
- uri := changesUri.String()
- log.Debugf("Fetching changes: %s", uri)
-
- /* If error, break the loop, and retry after interval */
- client := &http.Client{Timeout: httpTimeout} // must be greater than block value
- req, err := http.NewRequest("GET", uri, nil)
- addHeaders(req)
-
- r, err := client.Do(req)
- if err != nil {
- log.Errorf("change agent comm error: %s", err)
- return err
- }
-
- if r.StatusCode != http.StatusOK {
- log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
- switch r.StatusCode {
- case http.StatusUnauthorized:
- tokenManager.invalidateToken()
-
- case http.StatusNotModified:
- r.Body.Close()
- continue
-
- case http.StatusBadRequest:
- var apiErr changeServerError
- var b []byte
- b, err = ioutil.ReadAll(r.Body)
- if err != nil {
- log.Errorf("Unable to read response body: %v", err)
- break
- }
- err = json.Unmarshal(b, &apiErr)
- if err != nil {
- log.Errorf("JSON Response Data not parsable: %s", string(b))
- break
- }
- if apiErr.Code == "SNAPSHOT_TOO_OLD" {
- log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
- err = apiErr
- }
+ err := toExecute(quit)
+ if err == nil {
+ return
}
- r.Body.Close()
- return err
- }
+ if _, ok := err.(quitSignalError); ok {
+ return
+ }
- var resp common.ChangeList
- err = json.NewDecoder(r.Body).Decode(&resp)
- r.Body.Close()
- if err != nil {
- log.Errorf("JSON Response Data not parsable: %v", err)
- return err
- }
+ end := time.Now()
+ //error encountered, since we would have returned above otherwise
+ handleError(err)
- /*
- * If the lastSequence is already newer or the same than what we got via
- * resp.LastSequence, Ignore it.
- */
- if lastSequence != "" &&
- getChangeStatus(lastSequence, resp.LastSequence) != 1 {
- log.Errorf("Ignore change, already have newer changes")
- continue
- }
-
- if changesRequireDDLSync(resp) {
- log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
- return changeServerError{
- Code: "DDL changes detected; must get new snapshot",
+ /* TODO keep this around? Imagine an immediately erroring service,
+ * causing many sequential requests which could pollute logs
+ */
+ //only backoff if the request took less than one second
+ if end.After(start.Add(time.Second)) {
+ backoff.Reset()
+ retry = time.After(0 * time.Millisecond)
+ } else {
+ retry = time.After(backoff.Duration())
}
}
-
- /* If valid data present, Emit to plugins */
- if len(resp.Changes) > 0 {
- done := make(chan bool)
- events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) {
- done <- true
- })
-
- select {
- case <-time.After(httpTimeout):
- log.Panic("Timeout. Plugins failed to respond to changes.")
- case <-done:
- }
- } else {
- log.Debugf("No Changes detected for Scopes: %s", scopes)
- }
-
- updateSequence(resp.LastSequence)
}
}
-/*
- * seqCurr.Compare() will return 1, if its newer than seqPrev,
- * else will return 0, if same, or -1 if older.
- */
-func getChangeStatus(lastSeq string, currSeq string) int {
- seqPrev, err := common.ParseSequence(lastSeq)
- if err != nil {
- log.Panic("Unable to parse previous sequence string")
- }
- seqCurr, err := common.ParseSequence(currSeq)
- if err != nil {
- log.Panic("Unable to parse current sequence string")
- }
- return seqCurr.Compare(seqPrev)
-}
-
-func updateSequence(seq string) {
- lastSequence = seq
- err := updateLastSequence(seq)
- if err != nil {
- log.Panic("Unable to update Sequence in DB")
- }
-
-}
-
-func changesRequireDDLSync(changes common.ChangeList) bool {
-
- return !mapIsSubset(knownTables, extractTablesFromChangelist(changes))
-}
-
-// simple doubling back-off
-func createBackOff(retryIn, maxBackOff time.Duration) func() {
- return func() {
- if retryIn > maxBackOff {
- retryIn = maxBackOff
- }
- log.Debugf("backoff called. will retry in %s.", retryIn)
- time.Sleep(retryIn)
- retryIn = retryIn * time.Duration(2)
- }
-}
-
-func Redirect(req *http.Request, via []*http.Request) error {
+func Redirect(req *http.Request, _ []*http.Request) error {
req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
req.Header.Add("org", apidInfo.ClusterID) // todo: this is strange.. is it needed?
return nil
}
-// pollForChanges should usually be true, tests use the flag
-func bootstrap() {
-
- if apidInfo.LastSnapshot != "" {
- startOnLocalSnapshot(apidInfo.LastSnapshot)
- return
- }
-
- downloadBootSnapshot()
- downloadDataSnapshot()
- go pollForChanges()
-}
-
-// retrieve boot information: apid_config and apid_config_scope
-func downloadBootSnapshot() {
- log.Debug("download Snapshot for boot data")
-
- scopes := []string{apidInfo.ClusterID}
- snapshot := downloadSnapshot(scopes)
- storeBootSnapshot(snapshot)
-}
-
-func storeBootSnapshot(snapshot common.Snapshot) {
- // note that for boot snapshot case, we don't touch databases. We only update in-mem cache
- // This aims to deal with duplicate snapshot version#, see XAPID-869 for details
- scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
- for _, table := range snapshot.Tables {
- if table.Name == LISTENER_TABLE_DATA_SCOPE {
- for _, row := range table.Rows {
- ds := makeDataScopeFromRow(row)
- // cache scopes for this cluster
- if ds.ClusterID == apidInfo.ClusterID {
- scopeCache.updateCache(&ds)
- }
- }
- }
- }
- // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
-}
-
-// use the scope IDs from the boot snapshot to get all the data associated with the scopes
-func downloadDataSnapshot() {
- log.Debug("download Snapshot for data scopes")
-
- scopes := scopeCache.readAllScope()
-
- scopes = append(scopes, apidInfo.ClusterID)
- snapshot := downloadSnapshot(scopes)
- storeDataSnapshot(snapshot)
-}
-
-func storeDataSnapshot(snapshot common.Snapshot) {
- knownTables = extractTablesFromSnapshot(snapshot)
-
- db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
- persistKnownTablesToDB(knownTables, db)
-
- done := make(chan bool)
- log.Info("Emitting Snapshot to plugins")
- events.EmitWithCallback(ApigeeSyncEventSelector, &snapshot, func(event apid.Event) {
- done <- true
- })
-
- select {
- case <-time.After(pluginTimeout):
- log.Panic("Timeout. Plugins failed to respond to snapshot.")
- case <-done:
- }
-
-}
-
-func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) {
-
- tables = make(map[string]bool)
-
- log.Debug("Extracting table names from snapshot")
- if snapshot.Tables == nil {
- //if this panic ever fires, it's a bug
- log.Panicf("Attempt to extract known tables from snapshot without tables failed")
- }
-
- for _, table := range snapshot.Tables {
- tables[table.Name] = true
- }
-
- return tables
-}
-
-func extractTablesFromChangelist(changes common.ChangeList) (tables map[string]bool) {
-
- tables = make(map[string]bool)
-
- for _, change := range changes.Changes {
- tables[change.Table] = true
- }
-
- return tables
-}
-
-func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
-
- tables = make(map[string]bool)
-
- log.Debug("Extracting table names from existing DB")
- rows, err := db.Query("SELECT name FROM _known_tables;")
- defer rows.Close()
-
- if err != nil {
- log.Panicf("Error reading current set of tables: %v", err)
- }
-
- for rows.Next() {
- var table string
- if err := rows.Scan(&table); err != nil {
- log.Panicf("Error reading current set of tables: %v", err)
- }
- log.Debugf("Table %s found in existing db", table)
-
- tables[table] = true
- }
- return tables
-}
-
-func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
- log.Debugf("Inserting table names found in snapshot into db")
-
- tx, err := db.Begin()
- if err != nil {
- log.Panicf("Error starting transaction: %v", err)
- }
- defer tx.Rollback()
-
- _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
- if err != nil {
- log.Panicf("Could not create _known_tables table: %s", err)
- }
-
- for name := range tables {
- log.Debugf("Inserting %s into _known_tables", name)
- _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
- if err != nil {
- log.Panicf("Error encountered inserting into known tables ", err)
- }
-
- }
-
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error committing transaction: %v", err)
-
- }
-}
-
-// Skip Downloading snapshot if there is already a snapshot available from previous run
-func startOnLocalSnapshot(snapshot string) {
- log.Infof("Starting on local snapshot: %s", snapshot)
-
- // ensure DB version will be accessible on behalf of dependant plugins
- db, err := data.DBVersion(snapshot)
- if err != nil {
- log.Panicf("Database inaccessible: %v", err)
- }
-
- knownTables = extractTablesFromDB(db)
- scopeCache.clearAndInitCache(snapshot)
-
- // allow plugins (including this one) to start immediately on existing database
- // Note: this MUST have no tables as that is used as an indicator
- snap := &common.Snapshot{
- SnapshotInfo: snapshot,
- }
- events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) {
- go pollForChanges()
- })
-
- log.Infof("Started on local snapshot: %s", snapshot)
-}
-
-// will keep retrying with backoff until success
-func downloadSnapshot(scopes []string) common.Snapshot {
-
- log.Debug("downloadSnapshot")
-
- snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
- if err != nil {
- log.Panicf("bad url value for config %s: %s", snapshotUri, err)
- }
-
- /* Frame and send the snapshot request */
- snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")
-
- v := url.Values{}
- for _, scope := range scopes {
- v.Add("scope", scope)
- }
- snapshotUri.RawQuery = v.Encode()
- uri := snapshotUri.String()
- log.Infof("Snapshot Download: %s", uri)
-
- client := &http.Client{
- CheckRedirect: Redirect,
- Timeout: httpTimeout,
- }
-
- retryIn := 5 * time.Millisecond
- maxBackOff := maxBackoffTimeout
- backOffFunc := createBackOff(retryIn, maxBackOff)
- first := true
-
- for {
- if first {
- first = false
- } else {
- backOffFunc()
- }
-
- req, err := http.NewRequest("GET", uri, nil)
- if err != nil {
- // should never happen, but if it does, it's unrecoverable anyway
- log.Panicf("Snapshotserver comm error: %v", err)
- }
- addHeaders(req)
-
- // Set the transport protocol type based on conf file input
- if config.GetString(configSnapshotProtocol) == "json" {
- req.Header.Set("Accept", "application/json")
- } else {
- req.Header.Set("Accept", "application/proto")
- }
-
- // Issue the request to the snapshot server
- r, err := client.Do(req)
- if err != nil {
- log.Errorf("Snapshotserver comm error: %v", err)
- continue
- }
-
- if r.StatusCode != 200 {
- body, _ := ioutil.ReadAll(r.Body)
- log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
- r.Body.Close()
- continue
- }
-
- // Decode the Snapshot server response
- var resp common.Snapshot
- err = json.NewDecoder(r.Body).Decode(&resp)
- if err != nil {
- log.Errorf("JSON Response Data not parsable: %v", err)
- r.Body.Close()
- continue
- }
-
- r.Body.Close()
- return resp
- }
-}
-
func addHeaders(req *http.Request) {
- req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken())
+ req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken())
req.Header.Set("apid_instance_id", apidInfo.InstanceID)
req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
@@ -518,25 +103,20 @@
Code string `json:"code"`
}
+type quitSignalError struct {
+}
+
+type expected200Error struct {
+}
+
+func (an expected200Error) Error() string {
+ return "Did not recieve OK response"
+}
+
+func (a quitSignalError) Error() string {
+ return "Signal to quit encountered"
+}
+
func (a changeServerError) Error() string {
return a.Code
-}
-
-/*
- * Determine is map b is a subset of map a
- */
-func mapIsSubset(a map[string]bool, b map[string]bool) bool {
-
- //nil maps should not be passed in. Making the distinction between nil map and empty map
- if a == nil || b == nil {
- return false
- }
-
- for k := range b {
- if !a[k] {
- return false
- }
- }
-
- return true
-}
+}
\ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go
index e9af4df..9cdec11 100644
--- a/apigee_sync_test.go
+++ b/apigee_sync_test.go
@@ -5,90 +5,280 @@
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "net/http/httptest"
+ //"time"
)
-var _ = Describe("listener", func() {
+var _ = Describe("Sync", func() {
- It("should bootstrap from local DB if present", func(done Done) {
+ Context("Sync", func() {
- expectedTables := make(map[string]bool)
- expectedTables["kms.company"] = true
- expectedTables["edgex.apid_cluster"] = true
- expectedTables["edgex.data_scope"] = true
+ var initializeContext = func() {
+ testRouter = apid.API().Router()
+ testServer = httptest.NewServer(testRouter)
- Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
-
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- defer GinkgoRecover()
-
- if s, ok := event.(*common.Snapshot); ok {
-
- //verify that the knownTables array has been properly populated from existing DB
- Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue())
-
- Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
- Expect(s.Tables).To(BeNil())
-
- close(done)
+ // set up mock server
+ mockParms := MockParms{
+ ReliableAPI: false,
+ ClusterID: config.GetString(configApidClusterId),
+ TokenKey: config.GetString(configConsumerKey),
+ TokenSecret: config.GetString(configConsumerSecret),
+ Scope: "ert452",
+ Organization: "att",
+ Environment: "prod",
}
+ testMock = Mock(mockParms, testRouter)
+
+ config.Set(configProxyServerBaseURI, testServer.URL)
+ config.Set(configSnapServerBaseURI, testServer.URL)
+ config.Set(configChangeServerBaseURI, testServer.URL)
+ }
+
+ var restoreContext = func() {
+
+ testServer.Close()
+
+ config.Set(configProxyServerBaseURI, dummyConfigValue)
+ config.Set(configSnapServerBaseURI, dummyConfigValue)
+ config.Set(configChangeServerBaseURI, dummyConfigValue)
+
+ }
+
+ It("should succesfully bootstrap from clean slate", func(done Done) {
+ log.Info("Starting sync tests...")
+ var closeDone <-chan bool
+ initializeContext()
+ // do not wipe DB after. Lets use it
+ wipeDBAferTest = false
+ var lastSnapshot *common.Snapshot
+
+ expectedSnapshotTables := common.ChangeList{
+ Changes: []common.Change{common.Change{Table: "kms.company"},
+ common.Change{Table: "edgex.apid_cluster"},
+ common.Change{Table: "edgex.data_scope"}},
+ }
+
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ if s, ok := event.(*common.Snapshot); ok {
+
+ Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse())
+
+ //add apid_cluster and data_scope since those would present if this were a real scenario
+ knownTables["kms.app_credential"] = true
+ knownTables["kms.app_credential_apiproduct_mapper"] = true
+ knownTables["kms.developer"] = true
+ knownTables["kms.company_developer"] = true
+ knownTables["kms.api_product"] = true
+ knownTables["kms.app"] = true
+
+ lastSnapshot = s
+
+ for _, t := range s.Tables {
+ switch t.Name {
+
+ case "edgex.apid_cluster":
+ Expect(t.Rows).To(HaveLen(1))
+ r := t.Rows[0]
+ var id string
+ r.Get("id", &id)
+ Expect(id).To(Equal("bootstrap"))
+
+ case "edgex.data_scope":
+ Expect(t.Rows).To(HaveLen(2))
+ r := t.Rows[1] // get the non-cluster row
+
+ var id, clusterID, env, org, scope string
+ r.Get("id", &id)
+ r.Get("apid_cluster_id", &clusterID)
+ r.Get("env", &env)
+ r.Get("org", &org)
+ r.Get("scope", &scope)
+
+ Expect(id).To(Equal("ert452"))
+ Expect(scope).To(Equal("ert452"))
+ Expect(clusterID).To(Equal("bootstrap"))
+ Expect(env).To(Equal("prod"))
+ Expect(org).To(Equal("att"))
+ }
+ }
+
+ } else if cl, ok := event.(*common.ChangeList); ok {
+ closeDone = changeManager.close()
+ // ensure that snapshot switched DB versions
+ Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo))
+ expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo)
+ Expect(err).NotTo(HaveOccurred())
+ Expect(getDB() == expectedDB).Should(BeTrue())
+
+ Expect(cl.Changes).To(HaveLen(6))
+
+ var tables []string
+ for _, c := range cl.Changes {
+ tables = append(tables, c.Table)
+ Expect(c.NewRow).ToNot(BeNil())
+
+ var tenantID string
+ c.NewRow.Get("tenant_id", &tenantID)
+ Expect(tenantID).To(Equal("ert452"))
+ }
+
+ Expect(tables).To(ContainElement("kms.app_credential"))
+ Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper"))
+ Expect(tables).To(ContainElement("kms.developer"))
+ Expect(tables).To(ContainElement("kms.company_developer"))
+ Expect(tables).To(ContainElement("kms.api_product"))
+ Expect(tables).To(ContainElement("kms.app"))
+
+ go func() {
+ // when close done, all handlers for the first changeList have been executed
+ <-closeDone
+ defer GinkgoRecover()
+ // allow other handler to execute to insert last_sequence
+ var seq string
+ //for seq = ""; seq == ""; {
+ // time.Sleep(50 * time.Millisecond)
+ err := getDB().
+ QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;").
+ Scan(&seq)
+ Expect(err).NotTo(HaveOccurred())
+ //}
+ Expect(seq).To(Equal(cl.LastSequence))
+
+ restoreContext()
+ close(done)
+ }()
+
+ }
+ })
+ pie := apid.PluginsInitializedEvent{
+ Description: "plugins initialized",
+ }
+ pie.Plugins = append(pie.Plugins, pluginData)
+ postInitPlugins(pie)
+ }, 3)
+
+ It("should bootstrap from local DB if present", func(done Done) {
+
+ var closeDone <-chan bool
+
+ initializeContext()
+ expectedTables := common.ChangeList{
+ Changes: []common.Change{common.Change{Table: "kms.company"},
+ common.Change{Table: "edgex.apid_cluster"},
+ common.Change{Table: "edgex.data_scope"}},
+ }
+ Expect(apidInfo.LastSnapshot).NotTo(BeEmpty())
+
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+
+ if s, ok := event.(*common.Snapshot); ok {
+ // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed
+ // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler
+ closeDone = changeManager.close()
+ go func() {
+ // when close done, all handlers for the first snapshot have been executed
+ <-closeDone
+ //verify that the knownTables array has been properly populated from existing DB
+ Expect(changesRequireDDLSync(expectedTables)).To(BeFalse())
+
+ Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot))
+ Expect(s.Tables).To(BeNil())
+
+ restoreContext()
+ close(done)
+ }()
+
+ }
+ })
+ pie := apid.PluginsInitializedEvent{
+ Description: "plugins initialized",
+ }
+ pie.Plugins = append(pie.Plugins, pluginData)
+ postInitPlugins(pie)
+
+ }, 3)
+
+ It("should correctly identify non-proper subsets with respect to maps", func() {
+
+ //test b proper subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "b"}},
+ )).To(BeFalse())
+
+ //test a == b
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}},
+ )).To(BeFalse())
+
+ //test b superset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}},
+ )).To(BeTrue())
+
+ //test b not subset of a
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{common.Change{Table: "c"}},
+ )).To(BeTrue())
+
+ //test a empty
+ Expect(changesHaveNewTables(map[string]bool{},
+ []common.Change{common.Change{Table: "a"}},
+ )).To(BeTrue())
+
+ //test b empty
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true},
+ []common.Change{},
+ )).To(BeFalse())
+
+ //test b nil
+ Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue())
+
+ //test a nil
+ Expect(changesHaveNewTables(nil,
+ []common.Change{common.Change{Table: "a"}},
+ )).To(BeTrue())
+ }, 3)
+
+ // todo: disabled for now -
+ // there is precondition I haven't been able to track down that breaks this test on occasion
+ XIt("should process a new snapshot when change server requires it", func(done Done) {
+ oldSnap := apidInfo.LastSnapshot
+ apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
+ defer GinkgoRecover()
+
+ if s, ok := event.(*common.Snapshot); ok {
+ Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
+ close(done)
+ }
+ })
+ testMock.forceNewSnapshot()
})
- bootstrap()
+
+ It("Verify the Sequence Number Logic works as expected", func() {
+ Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
+ Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
+ Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
+ Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
+ Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
+ Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
+ }, 3)
+
+ /*
+ * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
+ */
+ It("Should be able to handle duplicate snapshot during bootstrap", func() {
+ initializeContext()
+
+ tokenManager = createTokenManager()
+ events.Listen(ApigeeSyncEventSelector, &handler{})
+
+ scopes := []string{apidInfo.ClusterID}
+ snapshot := &common.Snapshot{}
+ downloadSnapshot(scopes, snapshot, nil)
+ storeBootSnapshot(snapshot)
+ storeDataSnapshot(snapshot)
+ restoreContext()
+ }, 3)
})
-
- It("should correctly identify non-proper subsets with respect to maps", func() {
-
- //test b proper subset of a
- Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"b": true})).To(BeTrue())
-
- //test a == b
- Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true})).To(BeTrue())
-
- //test b superset of a
- Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true, "c": true})).To(BeFalse())
-
- //test b not subset of a
- Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"c": true})).To(BeFalse())
-
- //test b empty
- Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{})).To(BeTrue())
-
- //test a empty
- Expect(mapIsSubset(map[string]bool{}, map[string]bool{"b": true})).To(BeFalse())
- })
-
- // todo: disabled for now -
- // there is precondition I haven't been able to track down that breaks this test on occasion
- XIt("should process a new snapshot when change server requires it", func(done Done) {
- oldSnap := apidInfo.LastSnapshot
- apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) {
- defer GinkgoRecover()
-
- if s, ok := event.(*common.Snapshot); ok {
- Expect(s.SnapshotInfo).NotTo(Equal(oldSnap))
- close(done)
- }
- })
- testMock.forceNewSnapshot()
- })
-
- It("Verify the Sequence Number Logic works as expected", func() {
- Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1))
- Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1))
- Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0))
- Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1))
- Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1))
- Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1))
- })
-
- /*
- * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap
- */
- It("Should be able to handle duplicate snapshot during bootstrap", func() {
- scopes := []string{apidInfo.ClusterID}
- snapshot := downloadSnapshot(scopes)
- storeBootSnapshot(snapshot)
- storeDataSnapshot(snapshot)
- })
-
})
diff --git a/backoff.go b/backoff.go
new file mode 100644
index 0000000..e3a7403
--- /dev/null
+++ b/backoff.go
@@ -0,0 +1,84 @@
+package apidApigeeSync
+
+import (
+ "math"
+ "time"
+ "math/rand"
+)
+
+const defaultInitial time.Duration = 200 * time.Millisecond
+const defaultMax time.Duration = 10 * time.Second
+const defaultFactor float64 = 2
+
+type Backoff struct {
+ attempt int
+ initial, max time.Duration
+ jitter bool
+ backoffStrategy func() time.Duration
+}
+
+type ExponentialBackoff struct {
+ Backoff
+ factor float64
+}
+
+func NewExponentialBackoff(initial, max time.Duration, factor float64, jitter bool) *ExponentialBackoff {
+ backoff := &ExponentialBackoff{}
+
+ if initial <= 0 {
+ initial = defaultInitial
+ }
+ if max <= 0 {
+ max = defaultMax
+ }
+
+ if factor <= 0 {
+ factor = defaultFactor
+ }
+
+ backoff.initial = initial
+ backoff.max = max
+ backoff.attempt = 0
+ backoff.factor = factor
+ backoff.jitter = jitter
+ backoff.backoffStrategy = backoff.exponentialBackoffStrategy
+
+ return backoff
+}
+
+func (b *Backoff) Duration() time.Duration {
+ d := b.backoffStrategy()
+ b.attempt++
+ return d
+}
+
+func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration {
+
+ initial := float64(b.Backoff.initial)
+ attempt := float64(b.Backoff.attempt)
+ duration := initial * math.Pow(b.factor, attempt)
+
+ if duration > math.MaxInt64 {
+ return b.max
+ }
+ dur := time.Duration(duration)
+
+ if b.jitter {
+ duration = (rand.Float64()*(duration-initial) + initial)
+ }
+
+ if dur > b.max {
+ return b.max
+ }
+
+ log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond))
+ return dur
+}
+
+func (b *Backoff) Reset() {
+ b.attempt = 0
+}
+
+func (b *Backoff) Attempt() int {
+ return b.attempt
+}
diff --git a/backoff_test.go b/backoff_test.go
new file mode 100644
index 0000000..ae85909
--- /dev/null
+++ b/backoff_test.go
@@ -0,0 +1,56 @@
+package apidApigeeSync
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "time"
+)
+
+var _ = Describe("backoff", func() {
+
+ Context("Backoff timeout calculations", func() {
+
+ It("Should properly apply defaults", func() {
+ log.Info("Starting backoff tests...")
+ b := NewExponentialBackoff(0, 0, 0, true)
+ Expect(defaultInitial).To(Equal(b.initial))
+ Expect(defaultMax).To(Equal(b.max))
+ Expect(defaultFactor).To(Equal(b.factor))
+
+ b = NewExponentialBackoff(-1, -1, -1, true)
+ Expect(defaultInitial).To(Equal(b.initial))
+ Expect(defaultMax).To(Equal(b.max))
+ Expect(defaultFactor).To(Equal(b.factor))
+ })
+
+ It("should properly apply exponential backoff strategy", func() {
+ b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ Expect(1600 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(4).To(Equal(b.Attempt()))
+ })
+
+ It("should reset properly", func() {
+ b := NewExponentialBackoff(200 * time.Millisecond, 2 * time.Second, 2, false)
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ b.Reset()
+ Expect(200 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(1).To(Equal(b.Attempt()))
+ Expect(400 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(2).To(Equal(b.Attempt()))
+ Expect(800 * time.Millisecond).To(Equal(b.Duration()))
+ Expect(3).To(Equal(b.Attempt()))
+ })
+ })
+
+})
diff --git a/changes.go b/changes.go
new file mode 100644
index 0000000..aa8d822
--- /dev/null
+++ b/changes.go
@@ -0,0 +1,319 @@
+package apidApigeeSync
+
+import (
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "net/url"
+ "path"
+ "time"
+
+ "github.com/apigee-labs/transicator/common"
+ "sync/atomic"
+)
+
+var lastSequence string
+var block string = "45"
+
+type pollChangeManager struct {
+ // 0 for not closed, 1 for closed
+ isClosed *int32
+ // 0 for pollChangeWithBackoff() not launched, 1 for launched
+ isLaunched *int32
+ quitChan chan bool
+}
+
+func createChangeManager() *pollChangeManager {
+ isClosedInt := int32(0)
+ isLaunchedInt := int32(0)
+ return &pollChangeManager{
+ isClosed: &isClosedInt,
+ quitChan: make(chan bool),
+ isLaunched: &isLaunchedInt,
+ }
+}
+
+/*
+ * thread-safe close of pollChangeManager
+ * It marks status as closed immediately, quits backoff polling agent, and closes tokenManager
+ * use <- close() for blocking close
+ */
+func (c *pollChangeManager) close() <-chan bool {
+ finishChan := make(chan bool, 1)
+ //has been closed
+ if atomic.SwapInt32(c.isClosed, 1) == int32(1) {
+ log.Error("pollChangeManager: close() called on a closed pollChangeManager!")
+ go func() {
+ finishChan <- false
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+ }
+ // not launched
+ if atomic.LoadInt32(c.isLaunched) == int32(0) {
+ log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!")
+ go func() {
+ tokenManager.close()
+ finishChan <- false
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+ }
+ // launched
+ log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager")
+ go func() {
+ c.quitChan <- true
+ tokenManager.close()
+ finishChan <- true
+ log.Debug("change manager closed")
+ }()
+ return finishChan
+}
+
+/*
+ * thread-safe pollChangeWithBackoff(), guaranteed: only one polling thread
+ */
+
+func (c *pollChangeManager) pollChangeWithBackoff() {
+ // closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Error("pollChangeManager: pollChangeWithBackoff() called after closed")
+ return
+ }
+ // has been launched before
+ if atomic.SwapInt32(c.isLaunched, 1) == int32(1) {
+ log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before")
+ return
+ }
+
+ go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError)
+ log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff")
+
+}
+
+/*
+ * Long polls the change agent with a 45 second block. Parses the response from
+ * change agent and raises an event. Called by pollWithBackoff().
+ */
+func (c *pollChangeManager) pollChangeAgent(dummyQuit chan bool) error {
+
+ changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
+ if err != nil {
+ log.Errorf("bad url value for config %s: %s", changesUri, err)
+ return err
+ }
+ changesUri.Path = path.Join(changesUri.Path, "changes")
+
+ /*
+ * Check to see if we have lastSequence already saved in the DB,
+ * in which case, it has to be used to prevent re-reading same data
+ */
+ lastSequence = getLastSequence()
+
+ for {
+ select {
+ case <-c.quitChan:
+ log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager")
+ return quitSignalError{}
+ default:
+ err := c.getChanges(changesUri)
+ if err != nil {
+ if _, ok := err.(quitSignalError); ok {
+ log.Debug("pollChangeAgent: consuming the quit signal")
+ <-c.quitChan
+ }
+ return err
+ }
+ }
+ }
+}
+
+//TODO refactor this method more, split it up
+/* Make a single request to the changeserver to get a changelist */
+func (c *pollChangeManager) getChanges(changesUri *url.URL) error {
+ // if closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ return quitSignalError{}
+ }
+ log.Debug("polling...")
+
+ /* Find the scopes associated with the config id */
+ scopes := findScopesForId(apidInfo.ClusterID)
+ v := url.Values{}
+
+ /* Sequence added to the query if available */
+ if lastSequence != "" {
+ v.Add("since", lastSequence)
+ }
+ v.Add("block", block)
+
+ /*
+ * Include all the scopes associated with the config Id
+ * The Config Id is included as well, as it acts as the
+ * Bootstrap scope
+ */
+ for _, scope := range scopes {
+ v.Add("scope", scope)
+ }
+ v.Add("scope", apidInfo.ClusterID)
+ v.Add("snapshot", apidInfo.LastSnapshot)
+ changesUri.RawQuery = v.Encode()
+ uri := changesUri.String()
+ log.Debugf("Fetching changes: %s", uri)
+
+ /* If error, break the loop, and retry after interval */
+ client := &http.Client{Timeout: httpTimeout} // must be greater than block value
+ req, err := http.NewRequest("GET", uri, nil)
+ addHeaders(req)
+
+ r, err := client.Do(req)
+ if err != nil {
+ log.Errorf("change agent comm error: %s", err)
+ // if closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ return quitSignalError{}
+ }
+ return err
+ }
+ defer r.Body.Close()
+
+ // has been closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Debugf("getChanges: changeManager has been closed")
+ return quitSignalError{}
+ }
+
+ if r.StatusCode != http.StatusOK {
+ log.Errorf("Get changes request failed with status code: %d", r.StatusCode)
+ switch r.StatusCode {
+ case http.StatusUnauthorized:
+ tokenManager.invalidateToken()
+ return nil
+
+ case http.StatusNotModified:
+ return nil
+
+ case http.StatusBadRequest:
+ var apiErr changeServerError
+ var b []byte
+ b, err = ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Errorf("Unable to read response body: %v", err)
+ return err
+ }
+ err = json.Unmarshal(b, &apiErr)
+ if err != nil {
+ log.Errorf("JSON Response Data not parsable: %s", string(b))
+ return err
+ }
+ if apiErr.Code == "SNAPSHOT_TOO_OLD" {
+ log.Debug("Received SNAPSHOT_TOO_OLD message from change server.")
+ err = apiErr
+ }
+ return nil
+ }
+ return nil
+ }
+
+ var resp common.ChangeList
+ err = json.NewDecoder(r.Body).Decode(&resp)
+ if err != nil {
+ log.Errorf("JSON Response Data not parsable: %v", err)
+ return err
+ }
+
+ /*
+ * If the lastSequence is already newer or the same than what we got via
+ * resp.LastSequence, Ignore it.
+ */
+ if lastSequence != "" &&
+ getChangeStatus(lastSequence, resp.LastSequence) != 1 {
+ return changeServerError{
+ Code: "Ignore change, already have newer changes",
+ }
+ }
+
+ if changesRequireDDLSync(resp) {
+ return changeServerError{
+ Code: "DDL changes detected; must get new snapshot",
+ }
+ }
+
+ /* If valid data present, Emit to plugins */
+ if len(resp.Changes) > 0 {
+ select {
+ case <-time.After(httpTimeout):
+ log.Panic("Timeout. Plugins failed to respond to changes.")
+ case <-events.Emit(ApigeeSyncEventSelector, &resp):
+ }
+ } else {
+ log.Debugf("No Changes detected for Scopes: %s", scopes)
+ }
+
+ updateSequence(resp.LastSequence)
+
+ return nil
+}
+
+func changesRequireDDLSync(changes common.ChangeList) bool {
+ return changesHaveNewTables(knownTables, changes.Changes)
+}
+
+func (c *pollChangeManager) handleChangeServerError(err error) {
+ // has been closed
+ if atomic.LoadInt32(c.isClosed) == int32(1) {
+ log.Debugf("handleChangeServerError: changeManager has been closed")
+ return
+ }
+ if _, ok := err.(changeServerError); ok {
+ log.Info("Detected DDL changes, going to fetch a new snapshot to sync...")
+ downloadDataSnapshot(c.quitChan)
+ } else {
+ log.Debugf("Error connecting to changeserver: %v", err)
+ }
+}
+
+/*
+ * Determine if any tables in changes are not present in known tables
+ */
+func changesHaveNewTables(a map[string]bool, changes []common.Change) bool {
+
+ //nil maps should not be passed in. Making the distinction between nil map and empty map
+ if a == nil || changes == nil {
+ return true
+ }
+
+ for _, change := range changes {
+ if !a[change.Table] {
+ log.Infof("Unable to find %s table in current known tables", change.Table)
+ return true
+ }
+ }
+
+ return false
+}
+
+/*
+ * seqCurr.Compare() will return 1, if its newer than seqPrev,
+ * else will return 0, if same, or -1 if older.
+ */
+func getChangeStatus(lastSeq string, currSeq string) int {
+ seqPrev, err := common.ParseSequence(lastSeq)
+ if err != nil {
+ log.Panic("Unable to parse previous sequence string")
+ }
+ seqCurr, err := common.ParseSequence(currSeq)
+ if err != nil {
+ log.Panic("Unable to parse current sequence string")
+ }
+ return seqCurr.Compare(seqPrev)
+}
+
+func updateSequence(seq string) {
+ lastSequence = seq
+ err := updateLastSequence(seq)
+ if err != nil {
+ log.Panic("Unable to update Sequence in DB")
+ }
+
+}
diff --git a/data.go b/data.go
index 08ff3ba..48f1ff7 100644
--- a/data.go
+++ b/data.go
@@ -88,8 +88,9 @@
log.Debugf("inserting into APID_CLUSTER: %v", dac)
+ //replace to accomodate same snapshot txid
stmt, err := txn.Prepare(`
- INSERT INTO APID_CLUSTER
+ REPLACE INTO APID_CLUSTER
(id, description, name, umbrella_org_app_name,
created, created_by, updated, updated_by,
last_sequence)
@@ -117,8 +118,9 @@
log.Debugf("insert DATA_SCOPE: %v", ds)
+ //replace to accomodate same snapshot txid
stmt, err := txn.Prepare(`
- INSERT INTO DATA_SCOPE
+ REPLACE INTO DATA_SCOPE
(id, apid_cluster_id, scope, org,
env, created, created_by, updated,
updated_by)
@@ -237,7 +239,7 @@
// always use default database for this
var db apid.DB
- db, err = data.DB()
+ db, err = dataService.DB()
if err != nil {
return
}
@@ -254,6 +256,7 @@
newInstanceID = true
info.InstanceID = generateUUID()
+ log.Debugf("Inserting new apid instance id %s", info.InstanceID)
db.Exec("INSERT INTO APID (instance_id, last_snapshot_info) VALUES (?,?)",
info.InstanceID, "")
}
@@ -264,7 +267,7 @@
func updateApidInstanceInfo() error {
// always use default database for this
- db, err := data.DB()
+ db, err := dataService.DB()
if err != nil {
return err
}
diff --git a/datascope_cache.go b/datascope_cache.go
deleted file mode 100644
index 3d19710..0000000
--- a/datascope_cache.go
+++ /dev/null
@@ -1,93 +0,0 @@
-package apidApigeeSync
-
-const (
- readCache int = iota
- updateCache
- removeCache
- clearAndInit
-)
-
-/*
- * structs for DatascopeCache
- */
-
-type cacheOperationRequest struct {
- Operation int
- Scope *dataDataScope
- version string
-}
-
-// maintain an in-mem cache of datascope
-type DatascopeCache struct {
- requestChan chan *cacheOperationRequest
- readDoneChan chan []string
- scopeMap map[string]*dataDataScope
- version string
-}
-
-var scopeCache *DatascopeCache
-
-func (cache *DatascopeCache) datascopeCacheManager() {
- for request := range cache.requestChan {
- switch request.Operation {
- case readCache:
- log.Debug("datascopeCacheManager: readCache")
- scopes := make([]string, 0, len(cache.scopeMap))
- for _, ds := range cache.scopeMap {
- scopes = append(scopes, ds.Scope)
- }
- cache.readDoneChan <- scopes
- case updateCache:
- log.Debug("datascopeCacheManager: updateCache")
- cache.scopeMap[request.Scope.ID] = request.Scope
- case removeCache:
- log.Debug("datascopeCacheManager: removeCache")
- delete(cache.scopeMap, request.Scope.ID)
- case clearAndInit:
- log.Debug("datascopeCacheManager: clearAndInit")
- if cache.version != request.version {
- cache.scopeMap = make(map[string]*dataDataScope)
- cache.version = request.version
- }
- }
- }
-
- //chan closed
- cache.scopeMap = nil
- close(cache.readDoneChan)
-}
-
-/*
- * The output of readAllScope() should be identical to findScopesForId(apidInfo.ClusterID)
- */
-
-func (cache *DatascopeCache) readAllScope() []string {
- cache.requestChan <- &cacheOperationRequest{readCache, nil, ""}
- scopes := <-cache.readDoneChan
- // eliminate duplicates
- tmpMap := make(map[string]bool)
- for _, scope := range scopes {
- tmpMap[scope] = true
- }
- scopes = make([]string, 0)
- for scope := range tmpMap {
- scopes = append(scopes, scope)
- }
- return scopes
-}
-
-func (cache *DatascopeCache) removeCache(scope *dataDataScope) {
- cache.requestChan <- &cacheOperationRequest{removeCache, scope, ""}
-}
-
-func (cache *DatascopeCache) updateCache(scope *dataDataScope) {
- cache.requestChan <- &cacheOperationRequest{updateCache, scope, ""}
-}
-
-func (cache *DatascopeCache) clearAndInitCache(version string) {
- cache.requestChan <- &cacheOperationRequest{clearAndInit, nil, version}
-}
-
-func (cache *DatascopeCache) closeCache() {
- close(cache.requestChan)
-}
diff --git a/datascope_cache_test.go b/datascope_cache_test.go
deleted file mode 100644
index bc67f53..0000000
--- a/datascope_cache_test.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package apidApigeeSync
-
-import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "math/rand"
- "strconv"
- "time"
-)
-
-var _ = Describe("datascope cache", func() {
- /*
- * in-mem cache test
- */
- It("Test In-mem cache", func() {
- testCache := &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
- go testCache.datascopeCacheManager()
- testCache.clearAndInitCache("test-version")
- countChan := make(chan int)
- base := 10
- rand.Seed(time.Now().Unix())
- num := base + rand.Intn(base)
- scopeMap := make(map[string]bool)
- // async update
- for i := 0; i < num; i++ {
- id := strconv.Itoa(i)
- scopeStr := strconv.Itoa(i % base)
- scope := &dataDataScope{ID: id, Scope: scopeStr}
- scopeMap[scope.Scope] = true
- go func(scope *dataDataScope) {
- testCache.updateCache(scope)
- countChan <- 1
- }(scope)
- }
-
- // wait until update done
- for i := 0; i < num; i++ {
- <-countChan
- }
-
- // verify update
- retrievedScopes := testCache.readAllScope()
- Expect(len(scopeMap)).To(Equal(len(retrievedScopes)))
- for _, s := range retrievedScopes {
- // verify each retrieved scope is valid
- Expect(scopeMap[s]).To(BeTrue())
- // no duplicate scopes
- scopeMap[s] = true
- }
-
- // remove all the datascopes with odd scope
- count := 0
- for i := 0; i < num; i++ {
- if (i%base)%2 == 1 {
- count += 1
- id := strconv.Itoa(i)
- scopeStr := strconv.Itoa(i % base)
- scope := &dataDataScope{ID: id, Scope: scopeStr}
- go func(scope *dataDataScope) {
- testCache.removeCache(scope)
- countChan <- 1
- }(scope)
- }
- }
-
- for i := 0; i < count; i++ {
- <-countChan
- }
-
- // all retrieved scopes should be even
- retrievedScopes = testCache.readAllScope()
- for _, s := range retrievedScopes {
- scopeNum, _ := strconv.Atoi(s)
- Expect(scopeNum % 2).To(BeZero())
- }
-
- // async remove all datascopes
- for i := 0; i < num; i++ {
- id := strconv.Itoa(i)
- scopeStr := strconv.Itoa(i % base)
- scope := &dataDataScope{ID: id, Scope: scopeStr}
- go func(scope *dataDataScope) {
- testCache.removeCache(scope)
- countChan <- 1
- }(scope)
- }
-
- for i := 0; i < num; i++ {
- <-countChan
- }
- retrievedScopes = testCache.readAllScope()
- Expect(len(retrievedScopes)).To(Equal(0))
-
- testCache.closeCache()
- })
-
-})
diff --git a/init.go b/init.go
index d40a9e7..e1b31c5 100644
--- a/init.go
+++ b/init.go
@@ -29,14 +29,21 @@
)
var (
- log apid.LogService
- config apid.ConfigService
- data apid.DataService
- events apid.EventsService
- apidInfo apidInstanceInfo
- apidPluginDetails string
- newInstanceID bool
- tokenManager *tokenMan
+ /* All set during plugin initialization */
+ log apid.LogService
+ config apid.ConfigService
+ dataService apid.DataService
+ events apid.EventsService
+ apidInfo apidInstanceInfo
+ newInstanceID bool
+ tokenManager *tokenMan
+ changeManager *pollChangeManager
+ quitPollingSnapshotServer chan bool
+
+ /* Set during post plugin initialization
+ * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called
+ */
+ apidPluginDetails string = `[{"name":"apidApigeeSync","schemaVer":"1.0"}]`
)
type apidInstanceInfo struct {
@@ -52,7 +59,7 @@
apid.RegisterPlugin(initPlugin)
}
-func initDefaults() {
+func initConfigDefaults() {
config.SetDefault(configPollInterval, 120*time.Second)
config.SetDefault(configSnapshotProtocol, "json")
name, errh := os.Hostname()
@@ -64,57 +71,28 @@
log.Debugf("Using %s as display name", config.GetString(configName))
}
-func SetLogger(logger apid.LogService) {
- log = logger
-}
-
-func initPlugin(services apid.Services) (apid.PluginData, error) {
- SetLogger(services.Log().ForModule("apigeeSync"))
- log.Debug("start init")
-
- config = services.Config()
- initDefaults()
-
- data = services.Data()
+func initVariables(services apid.Services) error {
+ dataService = services.Data()
events = services.Events()
-
- scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)}
-
- go scopeCache.datascopeCacheManager()
-
- /* This callback function will get called, once all the plugins are
- * initialized (not just this plugin). This is needed because,
- * downloadSnapshots/changes etc have to begin to be processed only
- * after all the plugins are initialized
- */
- events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
-
- // check for required values
- for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
- configSnapServerBaseURI, configChangeServerBaseURI} {
- if !config.IsSet(key) {
- return pluginData, fmt.Errorf("Missing required config value: %s", key)
- }
- }
- proto := config.GetString(configSnapshotProtocol)
- if proto != "json" && proto != "proto" {
- return pluginData, fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
- }
+ //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines
+ //also useful for testing
+ quitPollingSnapshotServer = make(chan bool)
+ changeManager = createChangeManager()
// set up default database
- db, err := data.DB()
+ db, err := dataService.DB()
if err != nil {
- return pluginData, fmt.Errorf("Unable to access DB: %v", err)
+ return fmt.Errorf("Unable to access DB: %v", err)
}
err = initDB(db)
if err != nil {
- return pluginData, fmt.Errorf("Unable to access DB: %v", err)
+ return fmt.Errorf("Unable to access DB: %v", err)
}
setDB(db)
apidInfo, err = getApidInstanceInfo()
if err != nil {
- return pluginData, fmt.Errorf("Unable to get apid instance info: %v", err)
+ return fmt.Errorf("Unable to get apid instance info: %v", err)
}
if config.IsSet(configApidInstanceID) {
@@ -122,6 +100,64 @@
}
config.Set(configApidInstanceID, apidInfo.InstanceID)
+ return nil
+}
+
+func checkForRequiredValues() error {
+ // check for required values
+ for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
+ configSnapServerBaseURI, configChangeServerBaseURI} {
+ if !config.IsSet(key) {
+ return fmt.Errorf("Missing required config value: %s", key)
+ }
+ }
+ proto := config.GetString(configSnapshotProtocol)
+ if proto != "json" && proto != "proto" {
+ return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol)
+ }
+
+ return nil
+}
+
+func SetLogger(logger apid.LogService) {
+ log = logger
+}
+
+/* Idempotent state initialization */
+func _initPlugin(services apid.Services) error {
+ SetLogger(services.Log().ForModule("apigeeSync"))
+ log.Debug("start init")
+
+ config = services.Config()
+ err := checkForRequiredValues()
+ if err != nil {
+ return err
+ }
+
+ initConfigDefaults()
+
+ err = initVariables(services)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func initPlugin(services apid.Services) (apid.PluginData, error) {
+
+ err := _initPlugin(services)
+ if err != nil {
+ return pluginData, err
+ }
+
+ /* This callback function will get called once all the plugins are
+ * initialized (not just this plugin). This is needed because,
+ * downloadSnapshots/changes etc have to begin to be processed only
+ * after all the plugins are initialized
+ */
+ events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins)
+
log.Debug("end init")
return pluginData, nil
diff --git a/init_test.go b/init_test.go
index 7bcee56..479b4b6 100644
--- a/init_test.go
+++ b/init_test.go
@@ -10,18 +10,20 @@
Context("Apid Instance display name", func() {
It("should be hostname by default", func() {
- initDefaults()
+ log.Info("Starting init tests...")
+
+ initConfigDefaults()
Expect(apidInfo.InstanceName).To(Equal("testhost"))
- })
+ }, 3)
It("accept display name from config", func() {
config.Set(configName, "aa01")
- initDefaults()
+ initConfigDefaults()
var apidInfoLatest apidInstanceInfo
apidInfoLatest, _ = getApidInstanceInfo()
Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
Expect(apidInfoLatest.LastSnapshot).To(Equal(""))
- })
+ }, 3)
})
diff --git a/listener.go b/listener.go
index 1fbd82e..6c4b1ef 100644
--- a/listener.go
+++ b/listener.go
@@ -29,22 +29,42 @@
}
func processSnapshot(snapshot *common.Snapshot) {
-
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
- db, err := data.DBVersion(snapshot.SnapshotInfo)
+ db, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Unable to access database: %v", err)
}
- err = initDB(db)
+ if config.GetString(configSnapshotProtocol) == "json" {
+ processJsonSnapshot(snapshot, db)
+ } else if config.GetString(configSnapshotProtocol) == "sqlite" {
+ processSqliteSnapshot(snapshot, db)
+ }
+
+ //update apid instance info
+ apidInfo.LastSnapshot = snapshot.SnapshotInfo
+ err = updateApidInstanceInfo()
+ if err != nil {
+ log.Panicf("Unable to update instance info: %v", err)
+ }
+
+ setDB(db)
+ log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
+
+}
+
+func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) {
+ //nothing to do as of now, here as a placeholder
+}
+
+func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) {
+
+ err := initDB(db)
if err != nil {
log.Panicf("Unable to initialize database: %v", err)
}
- // clear cache
- scopeCache.clearAndInitCache(snapshot.SnapshotInfo)
-
tx, err := db.Begin()
if err != nil {
log.Panicf("Error starting transaction: %v", err)
@@ -73,10 +93,6 @@
if err != nil {
log.Panicf("Snapshot update failed: %v", err)
}
- // cache scopes for this cluster
- if ds.ClusterID == apidInfo.ClusterID {
- scopeCache.updateCache(&ds)
- }
}
}
}
@@ -85,15 +101,6 @@
if err != nil {
log.Panicf("Error committing Snapshot change: %v", err)
}
-
- apidInfo.LastSnapshot = snapshot.SnapshotInfo
- err = updateApidInstanceInfo()
- if err != nil {
- log.Panicf("Unable to update instance info: %v", err)
- }
-
- setDB(db)
- log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
}
func processChangeList(changes *common.ChangeList) {
@@ -121,19 +128,9 @@
case common.Insert:
ds := makeDataScopeFromRow(change.NewRow)
err = insertDataScope(ds, tx)
-
- // cache scopes for this cluster
- if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
- scopeCache.updateCache(&ds)
- }
case common.Delete:
ds := makeDataScopeFromRow(change.OldRow)
err = deleteDataScope(ds, tx)
-
- // cache scopes for this cluster
- if (ds.ClusterID == apidInfo.ClusterID) && (err == nil) {
- scopeCache.removeCache(&ds)
- }
default:
// common.Update is not allowed
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
diff --git a/listener_test.go b/listener_test.go
index 2b060de..ee0b4cc 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,6 +15,7 @@
Context("ApigeeSync snapshot event", func() {
It("should set DB to appropriate version", func() {
+ log.Info("Starting listener tests...")
//save the last snapshot, so we can restore it at the end of this context
saveLastSnapshot = apidInfo.LastSnapshot
@@ -28,7 +29,7 @@
Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo))
- expectedDB, err := data.DBVersion(event.SnapshotInfo)
+ expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
Expect(err).NotTo(HaveOccurred())
Expect(getDB() == expectedDB).Should(BeTrue())
@@ -47,7 +48,7 @@
}
Expect(func() { handler.Handle(&event) }).To(Panic())
- })
+ }, 3)
It("should process a valid Snapshot", func() {
@@ -203,7 +204,7 @@
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
})
Context("ApigeeSync change event", func() {
@@ -225,7 +226,7 @@
}
Expect(func() { handler.Handle(&event) }).To(Panic())
- })
+ }, 3)
It("update event should panic", func() {
@@ -242,7 +243,7 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
PIt("delete event should kill all the things!")
})
@@ -328,7 +329,7 @@
Expect(len(scopes)).To(Equal(2))
Expect(scopes[0]).To(Equal("s1"))
Expect(scopes[1]).To(Equal("s2"))
- })
+ }, 3)
It("delete event should delete", func() {
insert := common.ChangeList{
@@ -372,7 +373,7 @@
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(0))
- })
+ }, 3)
It("update event should panic", func() {
@@ -389,7 +390,7 @@
Expect(func() { handler.Handle(&event) }).To(Panic())
//restore the last snapshot
apidInfo.LastSnapshot = saveLastSnapshot
- })
+ }, 3)
})
diff --git a/snapshot.go b/snapshot.go
new file mode 100644
index 0000000..ae667bf
--- /dev/null
+++ b/snapshot.go
@@ -0,0 +1,257 @@
+package apidApigeeSync
+
+import (
+ "encoding/json"
+ "github.com/30x/apid-core"
+ "github.com/30x/apid-core/data"
+ "github.com/apigee-labs/transicator/common"
+ "net/http"
+ "os"
+
+ "io"
+ "io/ioutil"
+ "net/url"
+ "path"
+ "time"
+)
+
+// retrieve boot information: apid_config and apid_config_scope
+func downloadBootSnapshot(quitPolling chan bool) {
+ log.Debug("download Snapshot for boot data")
+
+ scopes := []string{apidInfo.ClusterID}
+ snapshot := &common.Snapshot{}
+ downloadSnapshot(scopes, snapshot, quitPolling)
+ storeBootSnapshot(snapshot)
+}
+
+func storeBootSnapshot(snapshot *common.Snapshot) {
+ processSnapshot(snapshot)
+}
+
+// use the scope IDs from the boot snapshot to get all the data associated with the scopes
+func downloadDataSnapshot(quitPolling chan bool) {
+ log.Debug("download Snapshot for data scopes")
+
+ scopes := findScopesForId(apidInfo.ClusterID)
+ scopes = append(scopes, apidInfo.ClusterID)
+ snapshot := &common.Snapshot{}
+ downloadSnapshot(scopes, snapshot, quitPolling)
+ storeDataSnapshot(snapshot)
+}
+
+func storeDataSnapshot(snapshot *common.Snapshot) {
+ knownTables = extractTablesFromSnapshot(snapshot)
+
+ db, err := dataService.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Database inaccessible: %v", err)
+ }
+ persistKnownTablesToDB(knownTables, db)
+
+ log.Info("Emitting Snapshot to plugins")
+
+ select {
+ case <-time.After(pluginTimeout):
+ log.Panic("Timeout. Plugins failed to respond to snapshot.")
+ case <-events.Emit(ApigeeSyncEventSelector, snapshot):
+ }
+
+}
+
+func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) {
+
+ tables = make(map[string]bool)
+
+ log.Debug("Extracting table names from snapshot")
+ if snapshot.Tables == nil {
+ //if this panic ever fires, it's a bug
+ log.Panicf("Attempt to extract known tables from snapshot without tables failed")
+ }
+
+ for _, table := range snapshot.Tables {
+ tables[table.Name] = true
+ }
+
+ return tables
+}
+
+func extractTablesFromDB(db apid.DB) (tables map[string]bool) {
+
+ tables = make(map[string]bool)
+
+ log.Debug("Extracting table names from existing DB")
+ rows, err := db.Query("SELECT name FROM _known_tables;")
+ defer rows.Close()
+
+ if err != nil {
+ log.Panicf("Error reading current set of tables: %v", err)
+ }
+
+ for rows.Next() {
+ var table string
+ if err := rows.Scan(&table); err != nil {
+ log.Panicf("Error reading current set of tables: %v", err)
+ }
+ log.Debugf("Table %s found in existing db", table)
+
+ tables[table] = true
+ }
+ return tables
+}
+
+// Skip Downloading snapshot if there is already a snapshot available from previous run
+func startOnLocalSnapshot(snapshot string) *common.Snapshot {
+ log.Infof("Starting on local snapshot: %s", snapshot)
+
+ // ensure DB version will be accessible on behalf of dependant plugins
+ db, err := dataService.DBVersion(snapshot)
+ if err != nil {
+ log.Panicf("Database inaccessible: %v", err)
+ }
+
+ knownTables = extractTablesFromDB(db)
+
+ // allow plugins (including this one) to start immediately on existing database
+ // Note: this MUST have no tables as that is used as an indicator
+ return &common.Snapshot{
+ SnapshotInfo: snapshot,
+ }
+}
+
+// will keep retrying with backoff until success
+func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) {
+
+ log.Debug("downloadSnapshot")
+
+ snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
+ if err != nil {
+ log.Panicf("bad url value for config %s: %s", snapshotUri, err)
+ }
+
+ snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")
+
+ v := url.Values{}
+ for _, scope := range scopes {
+ v.Add("scope", scope)
+ }
+ snapshotUri.RawQuery = v.Encode()
+ uri := snapshotUri.String()
+ log.Infof("Snapshot Download: %s", uri)
+
+ client := &http.Client{
+ CheckRedirect: Redirect,
+ Timeout: httpTimeout,
+ }
+
+ //pollWithBackoff only accepts function that accept a single quit channel
+ //to accomadate functions which need more parameters, wrap them in closures
+ attemptDownload := getAttemptDownloadClosure(client, snapshot, uri)
+
+ pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError)
+}
+
+func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error {
+ return func(_ chan bool) error {
+ req, err := http.NewRequest("GET", uri, nil)
+ if err != nil {
+ // should never happen, but if it does, it's unrecoverable anyway
+ log.Panicf("Snapshotserver comm error: %v", err)
+ }
+ addHeaders(req)
+
+ var processSnapshotResponse func(*http.Response, *common.Snapshot) error
+
+ // Set the transport protocol type based on conf file input
+ if config.GetString(configSnapshotProtocol) == "json" {
+ req.Header.Set("Accept", "application/json")
+ processSnapshotResponse = processSnapshotServerJsonResponse
+ } else if config.GetString(configSnapshotProtocol) == "sqlite" {
+ req.Header.Set("Accept", "application/transicator+sqlite")
+ processSnapshotResponse = processSnapshotServerFileResponse
+ }
+
+ // Issue the request to the snapshot server
+ r, err := client.Do(req)
+ if err != nil {
+ log.Errorf("Snapshotserver comm error: %v", err)
+ return err
+ }
+
+ defer r.Body.Close()
+
+ if r.StatusCode != 200 {
+ body, _ := ioutil.ReadAll(r.Body)
+ log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
+ return expected200Error{}
+ }
+
+ // Decode the Snapshot server response
+ err = processSnapshotResponse(r, snapshot)
+ if err != nil {
+ log.Errorf("Response Data not parsable: %v", err)
+ return err
+ }
+
+ return nil
+ }
+}
+
+func persistKnownTablesToDB(tables map[string]bool, db apid.DB) {
+ log.Debugf("Inserting table names found in snapshot into db")
+
+ tx, err := db.Begin()
+ if err != nil {
+ log.Panicf("Error starting transaction: %v", err)
+ }
+ defer tx.Rollback()
+
+ _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));")
+ if err != nil {
+ log.Panicf("Could not create _known_tables table: %s", err)
+ }
+
+ for name := range tables {
+ log.Debugf("Inserting %s into _known_tables", name)
+ _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name)
+ if err != nil {
+ log.Panicf("Error encountered inserting into known tables ", err)
+ }
+
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error committing transaction: %v", err)
+
+ }
+}
+
+func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error {
+ return json.NewDecoder(r.Body).Decode(snapshot)
+}
+
+func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error {
+ dbId := r.Header.Get("Transicator-Snapshot-TXID")
+ out, err := os.Create(data.DBPath(dbId))
+ if err != nil {
+ return err
+ }
+ defer out.Close()
+
+ //stream respose to DB
+ _, err = io.Copy(out, r.Body)
+
+ if err != nil {
+ return err
+ }
+
+ snapshot.SnapshotInfo = dbId
+ //TODO get timestamp from transicator. Not currently in response
+
+ return nil
+}
+
+func handleSnapshotServerError(err error) {
+ log.Debugf("Error connecting to snapshot server: %v", err)
+}
diff --git a/token.go b/token.go
index 60097c1..a6c118e 100644
--- a/token.go
+++ b/token.go
@@ -7,13 +7,12 @@
"net/http"
"net/url"
"path"
- "sync"
+ "sync/atomic"
"time"
)
var (
refreshFloatTime = time.Minute
- getTokenLock sync.Mutex
)
/*
@@ -26,15 +25,34 @@
*/
func createTokenManager() *tokenMan {
- t := &tokenMan{}
- t.doRefresh = make(chan bool, 1)
- t.maintainToken()
+ isClosedInt := int32(0)
+
+ t := &tokenMan{
+ quitPollingForToken: make(chan bool, 1),
+ closed: make(chan bool),
+ getTokenChan: make(chan bool),
+ invalidateTokenChan: make(chan bool),
+ returnTokenChan: make(chan *oauthToken),
+ invalidateDone: make(chan bool),
+ isClosed: &isClosedInt,
+ }
+
+ t.retrieveNewToken()
+ t.refreshTimer = time.After(t.token.refreshIn())
+ go t.maintainToken()
return t
}
type tokenMan struct {
- token *oauthToken
- doRefresh chan bool
+ token *oauthToken
+ isClosed *int32
+ quitPollingForToken chan bool
+ closed chan bool
+ getTokenChan chan bool
+ invalidateTokenChan chan bool
+ refreshTimer <-chan time.Time
+ returnTokenChan chan *oauthToken
+ invalidateDone chan bool
}
func (t *tokenMan) getBearerToken() string {
@@ -42,50 +60,63 @@
}
func (t *tokenMan) maintainToken() {
- go func() {
- for {
- if t.token.needsRefresh() {
- getTokenLock.Lock()
- t.retrieveNewToken()
- getTokenLock.Unlock()
- }
- select {
- case _, ok := <-t.doRefresh:
- if !ok {
- log.Debug("closed tokenMan")
- return
- }
- log.Debug("force token refresh")
- case <-time.After(t.token.refreshIn()):
- log.Debug("auto refresh token")
- }
+ for {
+ select {
+ case <-t.closed:
+ return
+ case <-t.refreshTimer:
+ log.Debug("auto refresh token")
+ t.retrieveNewToken()
+ t.refreshTimer = time.After(t.token.refreshIn())
+ case <-t.getTokenChan:
+ token := t.token
+ t.returnTokenChan <- token
+ case <-t.invalidateTokenChan:
+ t.retrieveNewToken()
+ t.refreshTimer = time.After(t.token.refreshIn())
+ t.invalidateDone <- true
}
- }()
-}
-
-func (t *tokenMan) invalidateToken() {
- log.Debug("invalidating token")
- t.token = nil
- t.doRefresh <- true
+ }
}
// will block until valid
-func (t *tokenMan) getToken() *oauthToken {
- getTokenLock.Lock()
- defer getTokenLock.Unlock()
-
- if t.token.isValid() {
- log.Debugf("returning existing token: %v", t.token)
- return t.token
+func (t *tokenMan) invalidateToken() {
+ //has been closed
+ if atomic.LoadInt32(t.isClosed) == int32(1) {
+ log.Debug("TokenManager: invalidateToken() called on closed tokenManager")
+ return
}
-
- t.retrieveNewToken()
- return t.token
+ log.Debug("invalidating token")
+ t.invalidateTokenChan <- true
+ <-t.invalidateDone
}
+func (t *tokenMan) getToken() *oauthToken {
+ //has been closed
+ if atomic.LoadInt32(t.isClosed) == int32(1) {
+ log.Debug("TokenManager: getToken() called on closed tokenManager")
+ return nil
+ }
+ t.getTokenChan <- true
+ return <-t.returnTokenChan
+}
+
+/*
+ * blocking close() of tokenMan
+ */
+
func (t *tokenMan) close() {
+ //has been closed
+ if atomic.SwapInt32(t.isClosed, 1) == int32(1) {
+ log.Panic("TokenManager: close() has been called before!")
+ return
+ }
log.Debug("close token manager")
- close(t.doRefresh)
+ t.quitPollingForToken <- true
+ // sending instead of closing, to make sure it enters the t.doRefresh branch
+ t.closed <- true
+ close(t.closed)
+ log.Debug("token manager closed")
}
// don't call externally. will block until success.
@@ -99,18 +130,11 @@
}
uri.Path = path.Join(uri.Path, "/accesstoken")
- retryIn := 5 * time.Millisecond
- maxBackOff := maxBackoffTimeout
- backOffFunc := createBackOff(retryIn, maxBackOff)
- first := true
+ pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) })
+}
- for {
- if first {
- first = false
- } else {
- backOffFunc()
- }
-
+func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error {
+ return func(_ chan bool) error {
form := url.Values{}
form.Set("grant_type", "client_credentials")
form.Add("client_id", config.GetString(configConsumerKey))
@@ -133,26 +157,26 @@
resp, err := client.Do(req)
if err != nil {
log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
- continue
+ return err
}
body, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
- continue
+ return err
}
if resp.StatusCode != 200 {
log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body))
- continue
+ return expected200Error{}
}
var token oauthToken
err = json.Unmarshal(body, &token)
if err != nil {
log.Errorf("unable to unmarshal JSON response '%s': %v", string(body), err)
- continue
+ return err
}
if token.ExpiresIn > 0 {
@@ -166,12 +190,17 @@
if newInstanceID {
newInstanceID = false
- updateApidInstanceInfo()
- }
+ err = updateApidInstanceInfo()
+ if err != nil {
+ log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err)
+ return err
+ }
+ }
t.token = &token
config.Set(configBearerToken, token.AccessToken)
- return
+
+ return nil
}
}
diff --git a/token_test.go b/token_test.go
index 5deec1c..045b318 100644
--- a/token_test.go
+++ b/token_test.go
@@ -1,5 +1,8 @@
package apidApigeeSync
+/*
+ * Unit test of token manager
+ */
import (
"time"
@@ -17,6 +20,8 @@
Context("oauthToken", func() {
It("should calculate valid token", func() {
+ log.Info("Starting token tests...")
+
t := &oauthToken{
AccessToken: "x",
ExpiresIn: 120000,
@@ -25,9 +30,10 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically(">", 0))
Expect(t.needsRefresh()).To(BeFalse())
Expect(t.isValid()).To(BeTrue())
- })
+ }, 3)
It("should calculate expired token", func() {
+
t := &oauthToken{
AccessToken: "x",
ExpiresIn: 0,
@@ -36,9 +42,10 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeFalse())
- })
+ }, 3)
It("should calculate token needing refresh", func() {
+
t := &oauthToken{
AccessToken: "x",
ExpiresIn: 59000,
@@ -47,44 +54,76 @@
Expect(t.refreshIn().Seconds()).To(BeNumerically("<", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeTrue())
- })
+ }, 3)
It("should calculate on empty token", func() {
+
t := &oauthToken{}
Expect(t.refreshIn().Seconds()).To(BeNumerically("<=", 0))
Expect(t.needsRefresh()).To(BeTrue())
Expect(t.isValid()).To(BeFalse())
- })
+ }, 3)
})
Context("tokenMan", func() {
It("should get a valid token", func() {
- token := tokenManager.getToken()
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+
+ res := oauthToken{
+ AccessToken: "ABCD",
+ ExpiresIn: 1000,
+ }
+ body, err := json.Marshal(res)
+ Expect(err).NotTo(HaveOccurred())
+ w.Write(body)
+ }))
+ config.Set(configProxyServerBaseURI, ts.URL)
+ testedTokenManager := createTokenManager()
+ token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
Expect(token.ExpiresIn > 0).To(BeTrue())
Expect(token.ExpiresAt).To(BeTemporally(">", time.Now()))
- bToken := tokenManager.getBearerToken()
+ bToken := testedTokenManager.getBearerToken()
Expect(bToken).To(Equal(token.AccessToken))
- })
+ testedTokenManager.close()
+ ts.Close()
+ }, 3)
It("should refresh when forced to", func() {
- token := tokenManager.getToken()
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+
+ res := oauthToken{
+ AccessToken: generateUUID(),
+ ExpiresIn: 1000,
+ }
+ body, err := json.Marshal(res)
+ Expect(err).NotTo(HaveOccurred())
+ w.Write(body)
+ }))
+ config.Set(configProxyServerBaseURI, ts.URL)
+
+ testedTokenManager := createTokenManager()
+ token := testedTokenManager.getToken()
Expect(token.AccessToken).ToNot(BeEmpty())
- tokenManager.invalidateToken()
+ testedTokenManager.invalidateToken()
- token2 := tokenManager.getToken()
+ token2 := testedTokenManager.getToken()
Expect(token).ToNot(Equal(token2))
Expect(token.AccessToken).ToNot(Equal(token2.AccessToken))
- })
+ testedTokenManager.close()
+ ts.Close()
+ }, 3)
It("should refresh in refresh interval", func(done Done) {
- finished := make(chan bool)
- var tm *tokenMan
+ finished := make(chan bool, 1)
start := time.Now()
count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -106,33 +145,27 @@
Expect(err).NotTo(HaveOccurred())
w.Write(body)
}))
- defer ts.Close()
- tokenManager.getToken()
- tokenManager.close()
- oldBase := config.Get(configProxyServerBaseURI)
config.Set(configProxyServerBaseURI, ts.URL)
- oldFloat := refreshFloatTime
- refreshFloatTime = 950 * time.Millisecond
- defer func() {
- tm.close()
- config.Set(configProxyServerBaseURI, oldBase)
- tokenManager = createTokenManager()
- refreshFloatTime = oldFloat
- }()
+ testedTokenManager := createTokenManager()
- tm = createTokenManager()
+ testedTokenManager.getToken()
+
<-finished
+
+ testedTokenManager.close()
+ ts.Close()
+
close(done)
- })
+ }, 3)
It("should have created_at_apid first time, update_at_apid after", func(done Done) {
-
- finished := make(chan bool)
- var tm *tokenMan
+ finished := make(chan bool, 1)
count := 0
+
+ newInstanceID = true
+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- defer GinkgoRecover()
count++
if count == 1 {
@@ -147,29 +180,23 @@
}
res := oauthToken{
AccessToken: string(count),
- ExpiresIn: 2000,
+ ExpiresIn: 200000,
}
body, err := json.Marshal(res)
Expect(err).NotTo(HaveOccurred())
w.Write(body)
}))
- defer ts.Close()
- tokenManager.getToken()
- tokenManager.close()
- oldBase := config.Get(configProxyServerBaseURI)
config.Set(configProxyServerBaseURI, ts.URL)
- defer func() {
- tm.close()
- config.Set(configProxyServerBaseURI, oldBase)
- tokenManager = createTokenManager()
- }()
+ testedTokenManager := createTokenManager()
- newInstanceID = true
- tm = createTokenManager()
- tm.invalidateToken()
+ testedTokenManager.getToken()
+ testedTokenManager.invalidateToken()
+ testedTokenManager.getToken()
<-finished
+ testedTokenManager.close()
+ ts.Close()
close(done)
- })
+ }, 3)
})
})