merging in master
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 876ec9c..9e6ed94 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -1,7 +1,6 @@ package apidApigeeSync import ( - "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -12,17 +11,25 @@ "time" "github.com/30x/apid-core" + "github.com/30x/apid-core/factory" ) var ( - tmpDir string - testServer *httptest.Server - testRouter apid.Router - testMock *MockServer + tmpDir string + testServer *httptest.Server + testRouter apid.Router + testMock *MockServer + wipeDBAferTest bool ) -var _ = BeforeSuite(func(done Done) { +const dummyConfigValue string = "placeholder" + +var _ = BeforeSuite(func() { + wipeDBAferTest = true +}) + +var _ = BeforeEach(func(done Done) { apid.Initialize(factory.DefaultServicesFactory()) config = apid.Config() @@ -32,12 +39,9 @@ Expect(err).NotTo(HaveOccurred()) config.Set("local_storage_path", tmpDir) - testRouter = apid.API().Router() - testServer = httptest.NewServer(testRouter) - - config.Set(configProxyServerBaseURI, testServer.URL) - config.Set(configSnapServerBaseURI, testServer.URL) - config.Set(configChangeServerBaseURI, testServer.URL) + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) config.Set(configSnapshotProtocol, "json") config.Set(configPollInterval, 10*time.Millisecond) @@ -49,140 +53,27 @@ block = "0" log = apid.Log() - // set up mock server - mockParms := MockParms{ - ReliableAPI: false, - ClusterID: config.GetString(configApidClusterId), - TokenKey: config.GetString(configConsumerKey), - TokenSecret: config.GetString(configConsumerSecret), - Scope: "ert452", - Organization: "att", - Environment: "prod", - } - testMock = Mock(mockParms, testRouter) + _initPlugin(apid.AllServices()) + close(done) +}, 3) - // This is actually the first test :) - // Tests that entire bootstrap and set of sync operations work - var lastSnapshot *common.Snapshot - - expectedSnapshotTables := make(map[string]bool) - expectedSnapshotTables["kms.company"] = true - expectedSnapshotTables["edgex.apid_cluster"] = true - expectedSnapshotTables["edgex.data_scope"] = true - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - - //verify that during downloadDataSnapshot, knownTables was correctly populated - Expect(mapIsSubset(knownTables, expectedSnapshotTables)).To(BeTrue()) - - /* After this, we will mock changes for tables not present in the initial snapshot - * until that is changed in the mock server, we have to spoof the known tables - */ - - //add apid_cluster and data_scope since those would present if this were a real scenario - knownTables["kms.app_credential"] = true - knownTables["kms.app_credential_apiproduct_mapper"] = true - knownTables["kms.developer"] = true - knownTables["kms.company_developer"] = true - knownTables["kms.api_product"] = true - knownTables["kms.app"] = true - - lastSnapshot = s - - for _, t := range s.Tables { - switch t.Name { - - case "edgex.apid_cluster": - Expect(t.Rows).To(HaveLen(1)) - r := t.Rows[0] - var id string - r.Get("id", &id) - Expect(id).To(Equal("bootstrap")) - - case "edgex.data_scope": - Expect(t.Rows).To(HaveLen(2)) - r := t.Rows[1] // get the non-cluster row - - var id, clusterID, env, org, scope string - r.Get("id", &id) - r.Get("apid_cluster_id", &clusterID) - r.Get("env", &env) - r.Get("org", &org) - r.Get("scope", &scope) - - Expect(id).To(Equal("ert452")) - Expect(scope).To(Equal("ert452")) - Expect(clusterID).To(Equal("bootstrap")) - Expect(env).To(Equal("prod")) - Expect(org).To(Equal("att")) - } - } - - } else if cl, ok := event.(*common.ChangeList); ok { - - // ensure that snapshot switched DB versions - Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) - expectedDB, err := data.DBVersion(lastSnapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) - Expect(getDB() == expectedDB).Should(BeTrue()) - - Expect(cl.Changes).To(HaveLen(6)) - - var tables []string - for _, c := range cl.Changes { - tables = append(tables, c.Table) - Expect(c.NewRow).ToNot(BeNil()) - - var tenantID string - c.NewRow.Get("tenant_id", &tenantID) - Expect(tenantID).To(Equal("ert452")) - } - - Expect(tables).To(ContainElement("kms.app_credential")) - Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) - Expect(tables).To(ContainElement("kms.developer")) - Expect(tables).To(ContainElement("kms.company_developer")) - Expect(tables).To(ContainElement("kms.api_product")) - Expect(tables).To(ContainElement("kms.app")) - - events.ListenFunc(apid.EventDeliveredSelector, func(e apid.Event) { - defer GinkgoRecover() - - // allow other handler to execute to insert last_sequence - time.Sleep(50 * time.Millisecond) - var seq string - err = getDB(). - QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). - Scan(&seq) - - Expect(err).NotTo(HaveOccurred()) - Expect(seq).To(Equal(cl.LastSequence)) - - close(done) - }) - } - }) - - apid.InitializePlugins() -}) - -var _ = BeforeEach(func() { +var _ = AfterEach(func() { apid.Events().Close() lastSequence = "" - _, err := getDB().Exec("DELETE FROM APID_CLUSTER") - Expect(err).NotTo(HaveOccurred()) - _, err = getDB().Exec("DELETE FROM DATA_SCOPE") - Expect(err).NotTo(HaveOccurred()) + if wipeDBAferTest { + _, err := getDB().Exec("DELETE FROM APID_CLUSTER") + Expect(err).NotTo(HaveOccurred()) + _, err = getDB().Exec("DELETE FROM DATA_SCOPE") + Expect(err).NotTo(HaveOccurred()) - db, err := data.DB() - Expect(err).NotTo(HaveOccurred()) - _, err = db.Exec("DELETE FROM APID") - Expect(err).NotTo(HaveOccurred()) + db, err := dataService.DB() + Expect(err).NotTo(HaveOccurred()) + _, err = db.Exec("DELETE FROM APID") + Expect(err).NotTo(HaveOccurred()) + } + wipeDBAferTest = true }) var _ = AfterSuite(func() {
diff --git a/apigee_sync.go b/apigee_sync.go index be3fc8c..612e891 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -1,514 +1,99 @@ package apidApigeeSync import ( - "encoding/json" - "net/http" - "net/url" - "path" - "time" - - "sync/atomic" - - "io/ioutil" - "github.com/30x/apid-core" - "github.com/apigee-labs/transicator/common" + "net/http" + "time" ) const ( - httpTimeout = time.Minute - pluginTimeout = time.Minute - maxBackoffTimeout = time.Minute + httpTimeout = time.Minute + pluginTimeout = time.Minute ) -var ( - block string = "45" - lastSequence string - polling uint32 - knownTables = make(map[string]bool) -) +var knownTables = make(map[string]bool) /* - * Polls change agent for changes. In event of errors, uses a doubling - * backoff from 200ms up to a max delay of the configPollInterval value. + * Start from existing snapshot if possible + * If an existing snapshot does not exist, use the apid scope to fetch + * all data scopes, then get a snapshot for those data scopes + * + * Then, poll for changes */ -func pollForChanges() { +func bootstrap() { - if atomic.SwapUint32(&polling, 1) == 1 { + if apidInfo.LastSnapshot != "" { + snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) + + events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { + changeManager.pollChangeWithBackoff() + }) + + log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) return } - var backOffFunc func() - pollInterval := config.GetDuration(configPollInterval) - for { - start := time.Now() - err := pollChangeAgent() - end := time.Now() - if err != nil { - if _, ok := err.(changeServerError); ok { - downloadDataSnapshot() - continue - } - log.Debugf("Error connecting to changeserver: %v", err) - } - if end.After(start.Add(time.Second)) { - backOffFunc = nil - continue - } - if backOffFunc == nil { - backOffFunc = createBackOff(200*time.Millisecond, pollInterval) - } - backOffFunc() - } + downloadBootSnapshot(nil) + downloadDataSnapshot(quitPollingSnapshotServer) - atomic.SwapUint32(&polling, 0) + changeManager.pollChangeWithBackoff() + } /* - * Long polls the change agent with a 45 second block. Parses the response from - * change agent and raises an event. Called by pollForChanges(). + * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy + * for retrying on errors */ -func pollChangeAgent() error { +func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) { - changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI)) - if err != nil { - log.Errorf("bad url value for config %s: %s", changesUri, err) - return err - } - changesUri.Path = path.Join(changesUri.Path, "changes") + backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2) - /* - * Check to see if we have lastSequence already saved in the DB, - * in which case, it has to be used to prevent re-reading same data - */ - lastSequence = getLastSequence() + //inintialize the retry channel to start first attempt immediately + retry := time.After(0 * time.Millisecond) + for { - log.Debug("polling...") + select { + case <-quit: + log.Info("Quit signal recieved. Returning") + return + case <-retry: + start := time.Now() - /* Find the scopes associated with the config id */ - scopes := scopeCache.readAllScope() - v := url.Values{} - - /* Sequence added to the query if available */ - if lastSequence != "" { - v.Add("since", lastSequence) - } - v.Add("block", block) - - /* - * Include all the scopes associated with the config Id - * The Config Id is included as well, as it acts as the - * Bootstrap scope - */ - for _, scope := range scopes { - v.Add("scope", scope) - } - v.Add("scope", apidInfo.ClusterID) - v.Add("snapshot", apidInfo.LastSnapshot) - changesUri.RawQuery = v.Encode() - uri := changesUri.String() - log.Debugf("Fetching changes: %s", uri) - - /* If error, break the loop, and retry after interval */ - client := &http.Client{Timeout: httpTimeout} // must be greater than block value - req, err := http.NewRequest("GET", uri, nil) - addHeaders(req) - - r, err := client.Do(req) - if err != nil { - log.Errorf("change agent comm error: %s", err) - return err - } - - if r.StatusCode != http.StatusOK { - log.Errorf("Get changes request failed with status code: %d", r.StatusCode) - switch r.StatusCode { - case http.StatusUnauthorized: - tokenManager.invalidateToken() - - case http.StatusNotModified: - r.Body.Close() - continue - - case http.StatusBadRequest: - var apiErr changeServerError - var b []byte - b, err = ioutil.ReadAll(r.Body) - if err != nil { - log.Errorf("Unable to read response body: %v", err) - break - } - err = json.Unmarshal(b, &apiErr) - if err != nil { - log.Errorf("JSON Response Data not parsable: %s", string(b)) - break - } - if apiErr.Code == "SNAPSHOT_TOO_OLD" { - log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") - err = apiErr - } + err := toExecute(quit) + if err == nil { + return } - r.Body.Close() - return err - } + if _, ok := err.(quitSignalError); ok { + return + } - var resp common.ChangeList - err = json.NewDecoder(r.Body).Decode(&resp) - r.Body.Close() - if err != nil { - log.Errorf("JSON Response Data not parsable: %v", err) - return err - } + end := time.Now() + //error encountered, since we would have returned above otherwise + handleError(err) - /* - * If the lastSequence is already newer or the same than what we got via - * resp.LastSequence, Ignore it. - */ - if lastSequence != "" && - getChangeStatus(lastSequence, resp.LastSequence) != 1 { - log.Errorf("Ignore change, already have newer changes") - continue - } - - if changesRequireDDLSync(resp) { - log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") - return changeServerError{ - Code: "DDL changes detected; must get new snapshot", + /* TODO keep this around? Imagine an immediately erroring service, + * causing many sequential requests which could pollute logs + */ + //only backoff if the request took less than one second + if end.After(start.Add(time.Second)) { + backoff.Reset() + retry = time.After(0 * time.Millisecond) + } else { + retry = time.After(backoff.Duration()) } } - - /* If valid data present, Emit to plugins */ - if len(resp.Changes) > 0 { - done := make(chan bool) - events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) { - done <- true - }) - - select { - case <-time.After(httpTimeout): - log.Panic("Timeout. Plugins failed to respond to changes.") - case <-done: - } - } else { - log.Debugf("No Changes detected for Scopes: %s", scopes) - } - - updateSequence(resp.LastSequence) } } -/* - * seqCurr.Compare() will return 1, if its newer than seqPrev, - * else will return 0, if same, or -1 if older. - */ -func getChangeStatus(lastSeq string, currSeq string) int { - seqPrev, err := common.ParseSequence(lastSeq) - if err != nil { - log.Panic("Unable to parse previous sequence string") - } - seqCurr, err := common.ParseSequence(currSeq) - if err != nil { - log.Panic("Unable to parse current sequence string") - } - return seqCurr.Compare(seqPrev) -} - -func updateSequence(seq string) { - lastSequence = seq - err := updateLastSequence(seq) - if err != nil { - log.Panic("Unable to update Sequence in DB") - } - -} - -func changesRequireDDLSync(changes common.ChangeList) bool { - - return !mapIsSubset(knownTables, extractTablesFromChangelist(changes)) -} - -// simple doubling back-off -func createBackOff(retryIn, maxBackOff time.Duration) func() { - return func() { - if retryIn > maxBackOff { - retryIn = maxBackOff - } - log.Debugf("backoff called. will retry in %s.", retryIn) - time.Sleep(retryIn) - retryIn = retryIn * time.Duration(2) - } -} - -func Redirect(req *http.Request, via []*http.Request) error { +func Redirect(req *http.Request, _ []*http.Request) error { req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken()) req.Header.Add("org", apidInfo.ClusterID) // todo: this is strange.. is it needed? return nil } -// pollForChanges should usually be true, tests use the flag -func bootstrap() { - - if apidInfo.LastSnapshot != "" { - startOnLocalSnapshot(apidInfo.LastSnapshot) - return - } - - downloadBootSnapshot() - downloadDataSnapshot() - go pollForChanges() -} - -// retrieve boot information: apid_config and apid_config_scope -func downloadBootSnapshot() { - log.Debug("download Snapshot for boot data") - - scopes := []string{apidInfo.ClusterID} - snapshot := downloadSnapshot(scopes) - storeBootSnapshot(snapshot) -} - -func storeBootSnapshot(snapshot common.Snapshot) { - // note that for boot snapshot case, we don't touch databases. We only update in-mem cache - // This aims to deal with duplicate snapshot version#, see XAPID-869 for details - scopeCache.clearAndInitCache(snapshot.SnapshotInfo) - for _, table := range snapshot.Tables { - if table.Name == LISTENER_TABLE_DATA_SCOPE { - for _, row := range table.Rows { - ds := makeDataScopeFromRow(row) - // cache scopes for this cluster - if ds.ClusterID == apidInfo.ClusterID { - scopeCache.updateCache(&ds) - } - } - } - } - // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot -} - -// use the scope IDs from the boot snapshot to get all the data associated with the scopes -func downloadDataSnapshot() { - log.Debug("download Snapshot for data scopes") - - scopes := scopeCache.readAllScope() - - scopes = append(scopes, apidInfo.ClusterID) - snapshot := downloadSnapshot(scopes) - storeDataSnapshot(snapshot) -} - -func storeDataSnapshot(snapshot common.Snapshot) { - knownTables = extractTablesFromSnapshot(snapshot) - - db, err := data.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - persistKnownTablesToDB(knownTables, db) - - done := make(chan bool) - log.Info("Emitting Snapshot to plugins") - events.EmitWithCallback(ApigeeSyncEventSelector, &snapshot, func(event apid.Event) { - done <- true - }) - - select { - case <-time.After(pluginTimeout): - log.Panic("Timeout. Plugins failed to respond to snapshot.") - case <-done: - } - -} - -func extractTablesFromSnapshot(snapshot common.Snapshot) (tables map[string]bool) { - - tables = make(map[string]bool) - - log.Debug("Extracting table names from snapshot") - if snapshot.Tables == nil { - //if this panic ever fires, it's a bug - log.Panicf("Attempt to extract known tables from snapshot without tables failed") - } - - for _, table := range snapshot.Tables { - tables[table.Name] = true - } - - return tables -} - -func extractTablesFromChangelist(changes common.ChangeList) (tables map[string]bool) { - - tables = make(map[string]bool) - - for _, change := range changes.Changes { - tables[change.Table] = true - } - - return tables -} - -func extractTablesFromDB(db apid.DB) (tables map[string]bool) { - - tables = make(map[string]bool) - - log.Debug("Extracting table names from existing DB") - rows, err := db.Query("SELECT name FROM _known_tables;") - defer rows.Close() - - if err != nil { - log.Panicf("Error reading current set of tables: %v", err) - } - - for rows.Next() { - var table string - if err := rows.Scan(&table); err != nil { - log.Panicf("Error reading current set of tables: %v", err) - } - log.Debugf("Table %s found in existing db", table) - - tables[table] = true - } - return tables -} - -func persistKnownTablesToDB(tables map[string]bool, db apid.DB) { - log.Debugf("Inserting table names found in snapshot into db") - - tx, err := db.Begin() - if err != nil { - log.Panicf("Error starting transaction: %v", err) - } - defer tx.Rollback() - - _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));") - if err != nil { - log.Panicf("Could not create _known_tables table: %s", err) - } - - for name := range tables { - log.Debugf("Inserting %s into _known_tables", name) - _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name) - if err != nil { - log.Panicf("Error encountered inserting into known tables ", err) - } - - } - - err = tx.Commit() - if err != nil { - log.Panicf("Error committing transaction: %v", err) - - } -} - -// Skip Downloading snapshot if there is already a snapshot available from previous run -func startOnLocalSnapshot(snapshot string) { - log.Infof("Starting on local snapshot: %s", snapshot) - - // ensure DB version will be accessible on behalf of dependant plugins - db, err := data.DBVersion(snapshot) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - - knownTables = extractTablesFromDB(db) - scopeCache.clearAndInitCache(snapshot) - - // allow plugins (including this one) to start immediately on existing database - // Note: this MUST have no tables as that is used as an indicator - snap := &common.Snapshot{ - SnapshotInfo: snapshot, - } - events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) { - go pollForChanges() - }) - - log.Infof("Started on local snapshot: %s", snapshot) -} - -// will keep retrying with backoff until success -func downloadSnapshot(scopes []string) common.Snapshot { - - log.Debug("downloadSnapshot") - - snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI)) - if err != nil { - log.Panicf("bad url value for config %s: %s", snapshotUri, err) - } - - /* Frame and send the snapshot request */ - snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots") - - v := url.Values{} - for _, scope := range scopes { - v.Add("scope", scope) - } - snapshotUri.RawQuery = v.Encode() - uri := snapshotUri.String() - log.Infof("Snapshot Download: %s", uri) - - client := &http.Client{ - CheckRedirect: Redirect, - Timeout: httpTimeout, - } - - retryIn := 5 * time.Millisecond - maxBackOff := maxBackoffTimeout - backOffFunc := createBackOff(retryIn, maxBackOff) - first := true - - for { - if first { - first = false - } else { - backOffFunc() - } - - req, err := http.NewRequest("GET", uri, nil) - if err != nil { - // should never happen, but if it does, it's unrecoverable anyway - log.Panicf("Snapshotserver comm error: %v", err) - } - addHeaders(req) - - // Set the transport protocol type based on conf file input - if config.GetString(configSnapshotProtocol) == "json" { - req.Header.Set("Accept", "application/json") - } else { - req.Header.Set("Accept", "application/proto") - } - - // Issue the request to the snapshot server - r, err := client.Do(req) - if err != nil { - log.Errorf("Snapshotserver comm error: %v", err) - continue - } - - if r.StatusCode != 200 { - body, _ := ioutil.ReadAll(r.Body) - log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body)) - r.Body.Close() - continue - } - - // Decode the Snapshot server response - var resp common.Snapshot - err = json.NewDecoder(r.Body).Decode(&resp) - if err != nil { - log.Errorf("JSON Response Data not parsable: %v", err) - r.Body.Close() - continue - } - - r.Body.Close() - return resp - } -} - func addHeaders(req *http.Request) { - req.Header.Add("Authorization", "Bearer "+tokenManager.getBearerToken()) + req.Header.Add("Authorization", "Bearer "+ tokenManager.getBearerToken()) req.Header.Set("apid_instance_id", apidInfo.InstanceID) req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) @@ -518,25 +103,20 @@ Code string `json:"code"` } +type quitSignalError struct { +} + +type expected200Error struct { +} + +func (an expected200Error) Error() string { + return "Did not recieve OK response" +} + +func (a quitSignalError) Error() string { + return "Signal to quit encountered" +} + func (a changeServerError) Error() string { return a.Code -} - -/* - * Determine is map b is a subset of map a - */ -func mapIsSubset(a map[string]bool, b map[string]bool) bool { - - //nil maps should not be passed in. Making the distinction between nil map and empty map - if a == nil || b == nil { - return false - } - - for k := range b { - if !a[k] { - return false - } - } - - return true -} +} \ No newline at end of file
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index e9af4df..0bf56bd 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -5,90 +5,283 @@ "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "net/http/httptest" + //"time" ) -var _ = Describe("listener", func() { +var _ = Describe("Sync", func() { - It("should bootstrap from local DB if present", func(done Done) { + Context("Sync", func() { - expectedTables := make(map[string]bool) - expectedTables["kms.company"] = true - expectedTables["edgex.apid_cluster"] = true - expectedTables["edgex.data_scope"] = true + var initializeContext = func() { + testRouter = apid.API().Router() + testServer = httptest.NewServer(testRouter) - Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) - - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - - //verify that the knownTables array has been properly populated from existing DB - Expect(mapIsSubset(knownTables, expectedTables)).To(BeTrue()) - - Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) - Expect(s.Tables).To(BeNil()) - - close(done) + // set up mock server + mockParms := MockParms{ + ReliableAPI: true, + ClusterID: config.GetString(configApidClusterId), + TokenKey: config.GetString(configConsumerKey), + TokenSecret: config.GetString(configConsumerSecret), + Scope: "ert452", + Organization: "att", + Environment: "prod", } + testMock = Mock(mockParms, testRouter) + + config.Set(configProxyServerBaseURI, testServer.URL) + config.Set(configSnapServerBaseURI, testServer.URL) + config.Set(configChangeServerBaseURI, testServer.URL) + } + + var restoreContext = func() { + + testServer.Close() + + config.Set(configProxyServerBaseURI, dummyConfigValue) + config.Set(configSnapServerBaseURI, dummyConfigValue) + config.Set(configChangeServerBaseURI, dummyConfigValue) + + } + + It("should succesfully bootstrap from clean slate", func(done Done) { + log.Info("Starting sync tests...") + var closeDone <-chan bool + initializeContext() + // do not wipe DB after. Lets use it + wipeDBAferTest = false + var lastSnapshot *common.Snapshot + + expectedSnapshotTables := common.ChangeList{ + Changes: []common.Change{common.Change{Table: "kms.company"}, + common.Change{Table: "edgex.apid_cluster"}, + common.Change{Table: "edgex.data_scope"}}, + } + + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + if s, ok := event.(*common.Snapshot); ok { + + Expect(changesRequireDDLSync(expectedSnapshotTables)).To(BeFalse()) + + //add apid_cluster and data_scope since those would present if this were a real scenario + knownTables["kms.app_credential"] = true + knownTables["kms.app_credential_apiproduct_mapper"] = true + knownTables["kms.developer"] = true + knownTables["kms.company_developer"] = true + knownTables["kms.api_product"] = true + knownTables["kms.app"] = true + + lastSnapshot = s + + for _, t := range s.Tables { + switch t.Name { + + case "edgex.apid_cluster": + Expect(t.Rows).To(HaveLen(1)) + r := t.Rows[0] + var id string + r.Get("id", &id) + Expect(id).To(Equal("bootstrap")) + + case "edgex.data_scope": + Expect(t.Rows).To(HaveLen(2)) + r := t.Rows[1] // get the non-cluster row + + var id, clusterID, env, org, scope string + r.Get("id", &id) + r.Get("apid_cluster_id", &clusterID) + r.Get("env", &env) + r.Get("org", &org) + r.Get("scope", &scope) + + Expect(id).To(Equal("ert452")) + Expect(scope).To(Equal("ert452")) + Expect(clusterID).To(Equal("bootstrap")) + Expect(env).To(Equal("prod")) + Expect(org).To(Equal("att")) + } + } + + } else if cl, ok := event.(*common.ChangeList); ok { + closeDone = changeManager.close() + // ensure that snapshot switched DB versions + Expect(apidInfo.LastSnapshot).To(Equal(lastSnapshot.SnapshotInfo)) + expectedDB, err := dataService.DBVersion(lastSnapshot.SnapshotInfo) + Expect(err).NotTo(HaveOccurred()) + Expect(getDB() == expectedDB).Should(BeTrue()) + + Expect(cl.Changes).To(HaveLen(6)) + + var tables []string + for _, c := range cl.Changes { + tables = append(tables, c.Table) + Expect(c.NewRow).ToNot(BeNil()) + + var tenantID string + c.NewRow.Get("tenant_id", &tenantID) + Expect(tenantID).To(Equal("ert452")) + } + + Expect(tables).To(ContainElement("kms.app_credential")) + Expect(tables).To(ContainElement("kms.app_credential_apiproduct_mapper")) + Expect(tables).To(ContainElement("kms.developer")) + Expect(tables).To(ContainElement("kms.company_developer")) + Expect(tables).To(ContainElement("kms.api_product")) + Expect(tables).To(ContainElement("kms.app")) + + go func() { + // when close done, all handlers for the first changeList have been executed + <-closeDone + defer GinkgoRecover() + // allow other handler to execute to insert last_sequence + var seq string + //for seq = ""; seq == ""; { + // time.Sleep(50 * time.Millisecond) + err := getDB(). + QueryRow("SELECT last_sequence FROM APID_CLUSTER LIMIT 1;"). + Scan(&seq) + Expect(err).NotTo(HaveOccurred()) + //} + Expect(seq).To(Equal(cl.LastSequence)) + + restoreContext() + close(done) + }() + + } + }) + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", + } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + }, 3) + + It("should bootstrap from local DB if present", func(done Done) { + + var closeDone <-chan bool + + initializeContext() + expectedTables := common.ChangeList{ + Changes: []common.Change{common.Change{Table: "kms.company"}, + common.Change{Table: "edgex.apid_cluster"}, + common.Change{Table: "edgex.data_scope"}}, + } + Expect(apidInfo.LastSnapshot).NotTo(BeEmpty()) + + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + + if s, ok := event.(*common.Snapshot); ok { + // In this test, the changeManager.pollChangeWithBackoff() has not been launched when changeManager closed + // This is because the changeManager.pollChangeWithBackoff() in bootstrap() happened after this handler + closeDone = changeManager.close() + go func() { + // when close done, all handlers for the first snapshot have been executed + <-closeDone + //verify that the knownTables array has been properly populated from existing DB + Expect(changesRequireDDLSync(expectedTables)).To(BeFalse()) + + Expect(s.SnapshotInfo).Should(Equal(apidInfo.LastSnapshot)) + Expect(s.Tables).To(BeNil()) + + restoreContext() + close(done) + }() + + } + }) + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", + } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + + }, 3) + + It("should correctly identify non-proper subsets with respect to maps", func() { + + //test b proper subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "b"}}, + )).To(BeFalse()) + + //test a == b + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}}, + )).To(BeFalse()) + + //test b superset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "a"}, common.Change{Table: "b"}, common.Change{Table: "c"}}, + )).To(BeTrue()) + + //test b not subset of a + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{common.Change{Table: "c"}}, + )).To(BeTrue()) + + //test a empty + Expect(changesHaveNewTables(map[string]bool{}, + []common.Change{common.Change{Table: "a"}}, + )).To(BeTrue()) + + //test b empty + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, + []common.Change{}, + )).To(BeFalse()) + + //test b nil + Expect(changesHaveNewTables(map[string]bool{"a": true, "b": true}, nil)).To(BeTrue()) + + //test a nil + Expect(changesHaveNewTables(nil, + []common.Change{common.Change{Table: "a"}}, + )).To(BeTrue()) }) - bootstrap() - }) + // todo: disabled for now - + // there is precondition I haven't been able to track down that breaks this test on occasion + XIt("should process a new snapshot when change server requires it", func(done Done) { + oldSnap := apidInfo.LastSnapshot + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + defer GinkgoRecover() - It("should correctly identify non-proper subsets with respect to maps", func() { - - //test b proper subset of a - Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"b": true})).To(BeTrue()) - - //test a == b - Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true})).To(BeTrue()) - - //test b superset of a - Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"a": true, "b": true, "c": true})).To(BeFalse()) - - //test b not subset of a - Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{"c": true})).To(BeFalse()) - - //test b empty - Expect(mapIsSubset(map[string]bool{"a": true, "b": true}, map[string]bool{})).To(BeTrue()) - - //test a empty - Expect(mapIsSubset(map[string]bool{}, map[string]bool{"b": true})).To(BeFalse()) - }) - - // todo: disabled for now - - // there is precondition I haven't been able to track down that breaks this test on occasion - XIt("should process a new snapshot when change server requires it", func(done Done) { - oldSnap := apidInfo.LastSnapshot - apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { - defer GinkgoRecover() - - if s, ok := event.(*common.Snapshot); ok { - Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) - close(done) - } + if s, ok := event.(*common.Snapshot); ok { + Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) + close(done) + } + }) + testMock.forceNewSnapshot() }) - testMock.forceNewSnapshot() - }) - It("Verify the Sequence Number Logic works as expected", func() { - Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) - Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) - Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) - Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) - Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) - Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) - }) - /* - * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap - */ - It("Should be able to handle duplicate snapshot during bootstrap", func() { - scopes := []string{apidInfo.ClusterID} - snapshot := downloadSnapshot(scopes) - storeBootSnapshot(snapshot) - storeDataSnapshot(snapshot) - }) + It("Verify the Sequence Number Logic works as expected", func() { + Expect(getChangeStatus("1.1.1", "1.1.2")).To(Equal(1)) + Expect(getChangeStatus("1.1.1", "1.2.1")).To(Equal(1)) + Expect(getChangeStatus("1.2.1", "1.2.1")).To(Equal(0)) + Expect(getChangeStatus("1.2.1", "1.2.2")).To(Equal(1)) + Expect(getChangeStatus("2.2.1", "1.2.2")).To(Equal(-1)) + Expect(getChangeStatus("2.2.1", "2.2.0")).To(Equal(-1)) + }) + /* + * XAPID-869, there should not be any panic if received duplicate snapshots during bootstrap + */ + It("Should be able to handle duplicate snapshot during bootstrap", func() { + initializeContext() + + pie := apid.PluginsInitializedEvent{ + Description: "plugins initialized", + } + pie.Plugins = append(pie.Plugins, pluginData) + postInitPlugins(pie) + + scopes := []string{apidInfo.ClusterID} + snapshot := &common.Snapshot{} + downloadSnapshot(scopes, snapshot, nil) + storeBootSnapshot(snapshot) + storeDataSnapshot(snapshot) + restoreContext() + }) + }) })
diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..291a037 --- /dev/null +++ b/backoff.go
@@ -0,0 +1,81 @@ +package apidApigeeSync + +import ( + "math" + "math/rand" + "time" +) + +const defaultInitial time.Duration = 200 * time.Millisecond +const defaultMax time.Duration = 10 * time.Second +const defaultFactor float64 = 2 + +type Backoff struct { + attempt int + initial, max time.Duration + backoffStrategy func() time.Duration +} + +type ExponentialBackoff struct { + Backoff + factor float64 +} + +func NewExponentialBackoff(initial, max time.Duration, factor float64) *Backoff { + backoff := &ExponentialBackoff{} + + if initial <= 0 { + initial = defaultInitial + } + if max <= 0 { + max = defaultMax + } + + if factor <= 0 { + factor = defaultFactor + } + + backoff.initial = initial + backoff.max = max + backoff.attempt = 0 + backoff.factor = factor + backoff.backoffStrategy = backoff.exponentialBackoffStrategy + + return &backoff.Backoff +} + +func (b *Backoff) Duration() time.Duration { + d := b.backoffStrategy() + b.attempt++ + return d +} + +func (b *ExponentialBackoff) exponentialBackoffStrategy() time.Duration { + + initial := float64(b.Backoff.initial) + attempt := float64(b.Backoff.attempt) + duration := initial * math.Pow(b.factor, attempt) + + //introduce some jitter + duration = (rand.Float64()*(duration-initial) + initial) + + if duration > math.MaxInt64 { + return b.max + } + dur := time.Duration(duration) + + if dur > b.max { + return b.max + } + + log.Debugf("Backing off for %d ms", int64(dur/time.Millisecond)) + return dur +} + +func (b *Backoff) Reset() { + b.attempt = 0 +} + +func (b *Backoff) Attempt() int { + return b.attempt +}
diff --git a/changes.go b/changes.go new file mode 100644 index 0000000..d8176c2 --- /dev/null +++ b/changes.go
@@ -0,0 +1,319 @@ +package apidApigeeSync + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + "path" + "time" + + "github.com/apigee-labs/transicator/common" + "sync/atomic" +) + +var lastSequence string +var block string = "45" + +type pollChangeManager struct { + // 0 for not closed, 1 for closed + isClosed *int32 + // 0 for pollChangeWithBackoff() not launched, 1 for launched + isLaunched *int32 + quitChan chan bool +} + +func createChangeManager() *pollChangeManager { + isClosedInt := int32(0) + isLaunchedInt := int32(0) + return &pollChangeManager{ + isClosed: &isClosedInt, + quitChan: make(chan bool), + isLaunched: &isLaunchedInt, + } +} + +/* + * thread-safe close of pollChangeManager + * It marks status as closed immediately, quits backoff polling agent, and closes tokenManager + * use <- close() for blocking close + */ +func (c *pollChangeManager) close() <-chan bool { + finishChan := make(chan bool, 1) + //has been closed + if atomic.SwapInt32(c.isClosed, 1) == int32(1) { + log.Error("pollChangeManager: close() called on a closed pollChangeManager!") + go func() { + finishChan <- false + log.Debug("change manager closed") + }() + return finishChan + } + // not launched + if atomic.LoadInt32(c.isLaunched) == int32(0) { + log.Error("pollChangeManager: close() called when pollChangeWithBackoff unlaunched! close tokenManager!") + go func() { + tokenManager.close() + finishChan <- false + log.Debug("change manager closed") + }() + return finishChan + } + // launched + log.Debug("pollChangeManager: close pollChangeWithBackoff and token manager") + go func() { + c.quitChan <- true + tokenManager.close() + finishChan <- true + log.Debug("change manager closed") + }() + return finishChan +} + +/* + * thread-safe pollChangeWithBackoff(), guaranteed: only one polling thread + */ + +func (c *pollChangeManager) pollChangeWithBackoff() { + // closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Error("pollChangeManager: pollChangeWithBackoff() called after closed") + return + } + // has been launched before + if atomic.SwapInt32(c.isLaunched, 1) == int32(1) { + log.Error("pollChangeManager: pollChangeWithBackoff() has been launched before") + return + } + + go pollWithBackoff(c.quitChan, c.pollChangeAgent, c.handleChangeServerError) + log.Debug("pollChangeManager: pollChangeWithBackoff() started pollWithBackoff") + +} + +/* + * Long polls the change agent with a 45 second block. Parses the response from + * change agent and raises an event. Called by pollWithBackoff(). + */ +func (c *pollChangeManager) pollChangeAgent(dummyQuit chan bool) error { + + changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI)) + if err != nil { + log.Errorf("bad url value for config %s: %s", changesUri, err) + return err + } + changesUri.Path = path.Join(changesUri.Path, "changes") + + /* + * Check to see if we have lastSequence already saved in the DB, + * in which case, it has to be used to prevent re-reading same data + */ + lastSequence = getLastSequence() + + for { + select { + case <-c.quitChan: + log.Info("pollChangeAgent; Recevied quit signal to stop polling change server, close token manager") + return quitSignalError{} + default: + err := c.getChanges(changesUri) + if err != nil { + if _, ok := err.(quitSignalError); ok { + log.Debug("pollChangeAgent: consuming the quit signal") + <-c.quitChan + } + return err + } + } + } +} + +//TODO refactor this method more, split it up +/* Make a single request to the changeserver to get a changelist */ +func (c *pollChangeManager) getChanges(changesUri *url.URL) error { + // if closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + return quitSignalError{} + } + log.Debug("polling...") + + /* Find the scopes associated with the config id */ + scopes := scopeCache.readAllScope() + v := url.Values{} + + /* Sequence added to the query if available */ + if lastSequence != "" { + v.Add("since", lastSequence) + } + v.Add("block", block) + + /* + * Include all the scopes associated with the config Id + * The Config Id is included as well, as it acts as the + * Bootstrap scope + */ + for _, scope := range scopes { + v.Add("scope", scope) + } + v.Add("scope", apidInfo.ClusterID) + v.Add("snapshot", apidInfo.LastSnapshot) + changesUri.RawQuery = v.Encode() + uri := changesUri.String() + log.Debugf("Fetching changes: %s", uri) + + /* If error, break the loop, and retry after interval */ + client := &http.Client{Timeout: httpTimeout} // must be greater than block value + req, err := http.NewRequest("GET", uri, nil) + addHeaders(req) + + r, err := client.Do(req) + if err != nil { + log.Errorf("change agent comm error: %s", err) + // if closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + return quitSignalError{} + } + return err + } + defer r.Body.Close() + + // has been closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Debugf("getChanges: changeManager has been closed") + return quitSignalError{} + } + + if r.StatusCode != http.StatusOK { + log.Errorf("Get changes request failed with status code: %d", r.StatusCode) + switch r.StatusCode { + case http.StatusUnauthorized: + tokenManager.invalidateToken() + return nil + + case http.StatusNotModified: + return nil + + case http.StatusBadRequest: + var apiErr changeServerError + var b []byte + b, err = ioutil.ReadAll(r.Body) + if err != nil { + log.Errorf("Unable to read response body: %v", err) + return err + } + err = json.Unmarshal(b, &apiErr) + if err != nil { + log.Errorf("JSON Response Data not parsable: %s", string(b)) + return err + } + if apiErr.Code == "SNAPSHOT_TOO_OLD" { + log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") + err = apiErr + } + return nil + } + return nil + } + + var resp common.ChangeList + err = json.NewDecoder(r.Body).Decode(&resp) + if err != nil { + log.Errorf("JSON Response Data not parsable: %v", err) + return err + } + + /* + * If the lastSequence is already newer or the same than what we got via + * resp.LastSequence, Ignore it. + */ + if lastSequence != "" && + getChangeStatus(lastSequence, resp.LastSequence) != 1 { + return changeServerError{ + Code: "Ignore change, already have newer changes", + } + } + + if changesRequireDDLSync(resp) { + return changeServerError{ + Code: "DDL changes detected; must get new snapshot", + } + } + + /* If valid data present, Emit to plugins */ + if len(resp.Changes) > 0 { + select { + case <-time.After(httpTimeout): + log.Panic("Timeout. Plugins failed to respond to changes.") + case <-events.Emit(ApigeeSyncEventSelector, &resp): + } + } else { + log.Debugf("No Changes detected for Scopes: %s", scopes) + } + + updateSequence(resp.LastSequence) + + return nil +} + +func changesRequireDDLSync(changes common.ChangeList) bool { + return changesHaveNewTables(knownTables, changes.Changes) +} + +func (c *pollChangeManager) handleChangeServerError(err error) { + // has been closed + if atomic.LoadInt32(c.isClosed) == int32(1) { + log.Debugf("handleChangeServerError: changeManager has been closed") + return + } + if _, ok := err.(changeServerError); ok { + log.Info("Detected DDL changes, going to fetch a new snapshot to sync...") + downloadDataSnapshot(c.quitChan) + } else { + log.Debugf("Error connecting to changeserver: %v", err) + } +} + +/* + * Determine if any tables in changes are not present in known tables + */ +func changesHaveNewTables(a map[string]bool, changes []common.Change) bool { + + //nil maps should not be passed in. Making the distinction between nil map and empty map + if a == nil || changes == nil { + return true + } + + for _, change := range changes { + if !a[change.Table] { + log.Infof("Unable to find %s table in current known tables", change.Table) + return true + } + } + + return false +} + +/* + * seqCurr.Compare() will return 1, if its newer than seqPrev, + * else will return 0, if same, or -1 if older. + */ +func getChangeStatus(lastSeq string, currSeq string) int { + seqPrev, err := common.ParseSequence(lastSeq) + if err != nil { + log.Panic("Unable to parse previous sequence string") + } + seqCurr, err := common.ParseSequence(currSeq) + if err != nil { + log.Panic("Unable to parse current sequence string") + } + return seqCurr.Compare(seqPrev) +} + +func updateSequence(seq string) { + lastSequence = seq + err := updateLastSequence(seq) + if err != nil { + log.Panic("Unable to update Sequence in DB") + } + +}
diff --git a/data.go b/data.go index 08ff3ba..4e5a62c 100644 --- a/data.go +++ b/data.go
@@ -237,7 +237,7 @@ // always use default database for this var db apid.DB - db, err = data.DB() + db, err = dataService.DB() if err != nil { return } @@ -254,6 +254,7 @@ newInstanceID = true info.InstanceID = generateUUID() + log.Debugf("Inserting new apid instance id %s", info.InstanceID) db.Exec("INSERT INTO APID (instance_id, last_snapshot_info) VALUES (?,?)", info.InstanceID, "") } @@ -264,7 +265,7 @@ func updateApidInstanceInfo() error { // always use default database for this - db, err := data.DB() + db, err := dataService.DB() if err != nil { return err }
diff --git a/init.go b/init.go index d40a9e7..98b4770 100644 --- a/init.go +++ b/init.go
@@ -29,14 +29,21 @@ ) var ( - log apid.LogService - config apid.ConfigService - data apid.DataService - events apid.EventsService - apidInfo apidInstanceInfo - apidPluginDetails string - newInstanceID bool - tokenManager *tokenMan + /* All set during plugin initialization */ + log apid.LogService + config apid.ConfigService + dataService apid.DataService + events apid.EventsService + apidInfo apidInstanceInfo + newInstanceID bool + tokenManager *tokenMan + changeManager *pollChangeManager + quitPollingSnapshotServer chan bool + + /* Set during post plugin initialization + * set this as a default, so that it's guaranteed to be valid even if postInitPlugins isn't called + */ + apidPluginDetails string = `[{"name":"apidApigeeSync","schemaVer":"1.0"}]` ) type apidInstanceInfo struct { @@ -52,7 +59,7 @@ apid.RegisterPlugin(initPlugin) } -func initDefaults() { +func initConfigDefaults() { config.SetDefault(configPollInterval, 120*time.Second) config.SetDefault(configSnapshotProtocol, "json") name, errh := os.Hostname() @@ -64,57 +71,28 @@ log.Debugf("Using %s as display name", config.GetString(configName)) } -func SetLogger(logger apid.LogService) { - log = logger -} - -func initPlugin(services apid.Services) (apid.PluginData, error) { - SetLogger(services.Log().ForModule("apigeeSync")) - log.Debug("start init") - - config = services.Config() - initDefaults() - - data = services.Data() +func initVariables(services apid.Services) error { + dataService = services.Data() events = services.Events() - - scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} - - go scopeCache.datascopeCacheManager() - - /* This callback function will get called, once all the plugins are - * initialized (not just this plugin). This is needed because, - * downloadSnapshots/changes etc have to begin to be processed only - * after all the plugins are initialized - */ - events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) - - // check for required values - for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, - configSnapServerBaseURI, configChangeServerBaseURI} { - if !config.IsSet(key) { - return pluginData, fmt.Errorf("Missing required config value: %s", key) - } - } - proto := config.GetString(configSnapshotProtocol) - if proto != "json" && proto != "proto" { - return pluginData, fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) - } + //TODO listen for arbitrary commands, these channels can be used to kill polling goroutines + //also useful for testing + quitPollingSnapshotServer = make(chan bool) + changeManager = createChangeManager() // set up default database - db, err := data.DB() + db, err := dataService.DB() if err != nil { - return pluginData, fmt.Errorf("Unable to access DB: %v", err) + return fmt.Errorf("Unable to access DB: %v", err) } err = initDB(db) if err != nil { - return pluginData, fmt.Errorf("Unable to access DB: %v", err) + return fmt.Errorf("Unable to access DB: %v", err) } setDB(db) apidInfo, err = getApidInstanceInfo() if err != nil { - return pluginData, fmt.Errorf("Unable to get apid instance info: %v", err) + return fmt.Errorf("Unable to get apid instance info: %v", err) } if config.IsSet(configApidInstanceID) { @@ -122,6 +100,66 @@ } config.Set(configApidInstanceID, apidInfo.InstanceID) + scopeCache = &DatascopeCache{requestChan: make(chan *cacheOperationRequest), readDoneChan: make(chan []string)} + go scopeCache.datascopeCacheManager() + return nil +} + +func checkForRequiredValues() error { + // check for required values + for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret, + configSnapServerBaseURI, configChangeServerBaseURI} { + if !config.IsSet(key) { + return fmt.Errorf("Missing required config value: %s", key) + } + } + proto := config.GetString(configSnapshotProtocol) + if proto != "json" && proto != "proto" { + return fmt.Errorf("Illegal value for %s. Must be: 'json' or 'proto'", configSnapshotProtocol) + } + + return nil +} + +func SetLogger(logger apid.LogService) { + log = logger +} + +/* Idempotent state initialization */ +func _initPlugin(services apid.Services) error { + SetLogger(services.Log().ForModule("apigeeSync")) + log.Debug("start init") + + config = services.Config() + err := checkForRequiredValues() + if err != nil { + return err + } + + initConfigDefaults() + + err = initVariables(services) + if err != nil { + return err + } + + return nil +} + +func initPlugin(services apid.Services) (apid.PluginData, error) { + + err := _initPlugin(services) + if err != nil { + return pluginData, err + } + + /* This callback function will get called once all the plugins are + * initialized (not just this plugin). This is needed because, + * downloadSnapshots/changes etc have to begin to be processed only + * after all the plugins are initialized + */ + events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) + log.Debug("end init") return pluginData, nil
diff --git a/init_test.go b/init_test.go index 7bcee56..6135875 100644 --- a/init_test.go +++ b/init_test.go
@@ -10,13 +10,15 @@ Context("Apid Instance display name", func() { It("should be hostname by default", func() { - initDefaults() + log.Info("Starting init tests...") + + initConfigDefaults() Expect(apidInfo.InstanceName).To(Equal("testhost")) }) It("accept display name from config", func() { config.Set(configName, "aa01") - initDefaults() + initConfigDefaults() var apidInfoLatest apidInstanceInfo apidInfoLatest, _ = getApidInstanceInfo() Expect(apidInfoLatest.InstanceName).To(Equal("aa01"))
diff --git a/listener.go b/listener.go index 1fbd82e..0090326 100644 --- a/listener.go +++ b/listener.go
@@ -29,15 +29,38 @@ } func processSnapshot(snapshot *common.Snapshot) { - log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) - db, err := data.DBVersion(snapshot.SnapshotInfo) + db, err := dataService.DBVersion(snapshot.SnapshotInfo) if err != nil { log.Panicf("Unable to access database: %v", err) } - err = initDB(db) + if config.GetString(configSnapshotProtocol) == "json" { + processJsonSnapshot(snapshot, db) + } else if config.GetString(configSnapshotProtocol) == "sqlite" { + processSqliteSnapshot(snapshot, db) + } + + //update apid instance info + apidInfo.LastSnapshot = snapshot.SnapshotInfo + err = updateApidInstanceInfo() + if err != nil { + log.Panicf("Unable to update instance info: %v", err) + } + + setDB(db) + log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo) + +} + +func processSqliteSnapshot(snapshot *common.Snapshot, db apid.DB) { + //nothing to do as of now, here as a placeholder +} + +func processJsonSnapshot(snapshot *common.Snapshot, db apid.DB) { + + err := initDB(db) if err != nil { log.Panicf("Unable to initialize database: %v", err) } @@ -85,15 +108,6 @@ if err != nil { log.Panicf("Error committing Snapshot change: %v", err) } - - apidInfo.LastSnapshot = snapshot.SnapshotInfo - err = updateApidInstanceInfo() - if err != nil { - log.Panicf("Unable to update instance info: %v", err) - } - - setDB(db) - log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo) } func processChangeList(changes *common.ChangeList) {
diff --git a/listener_test.go b/listener_test.go index 2b060de..e426d39 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -15,6 +15,7 @@ Context("ApigeeSync snapshot event", func() { It("should set DB to appropriate version", func() { + log.Info("Starting listener tests...") //save the last snapshot, so we can restore it at the end of this context saveLastSnapshot = apidInfo.LastSnapshot @@ -28,7 +29,7 @@ Expect(apidInfo.LastSnapshot).To(Equal(event.SnapshotInfo)) - expectedDB, err := data.DBVersion(event.SnapshotInfo) + expectedDB, err := dataService.DBVersion(event.SnapshotInfo) Expect(err).NotTo(HaveOccurred()) Expect(getDB() == expectedDB).Should(BeTrue())
diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 0000000..aa5eada --- /dev/null +++ b/snapshot.go
@@ -0,0 +1,271 @@ +package apidApigeeSync + +import ( + "encoding/json" + "github.com/30x/apid-core" + "github.com/30x/apid-core/data" + "github.com/apigee-labs/transicator/common" + "net/http" + "os" + + "io" + "io/ioutil" + "net/url" + "path" + "time" +) + +// retrieve boot information: apid_config and apid_config_scope +func downloadBootSnapshot(quitPolling chan bool) { + log.Debug("download Snapshot for boot data") + + scopes := []string{apidInfo.ClusterID} + snapshot := &common.Snapshot{} + downloadSnapshot(scopes, snapshot, quitPolling) + storeBootSnapshot(snapshot) +} + +func storeBootSnapshot(snapshot *common.Snapshot) { + // note that for boot snapshot case, we don't touch databases. We only update in-mem cache + // This aims to deal with duplicate snapshot version#, see XAPID-869 for details + scopeCache.clearAndInitCache(snapshot.SnapshotInfo) + for _, table := range snapshot.Tables { + if table.Name == LISTENER_TABLE_DATA_SCOPE { + for _, row := range table.Rows { + ds := makeDataScopeFromRow(row) + // cache scopes for this cluster + if ds.ClusterID == apidInfo.ClusterID { + scopeCache.updateCache(&ds) + } + } + } + } + // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot +} + +// use the scope IDs from the boot snapshot to get all the data associated with the scopes +func downloadDataSnapshot(quitPolling chan bool) { + log.Debug("download Snapshot for data scopes") + + scopes := scopeCache.readAllScope() + scopes = append(scopes, apidInfo.ClusterID) + snapshot := &common.Snapshot{} + downloadSnapshot(scopes, snapshot, quitPolling) + storeDataSnapshot(snapshot) +} + +func storeDataSnapshot(snapshot *common.Snapshot) { + knownTables = extractTablesFromSnapshot(snapshot) + + db, err := dataService.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + persistKnownTablesToDB(knownTables, db) + + log.Info("Emitting Snapshot to plugins") + + select { + case <-time.After(pluginTimeout): + log.Panic("Timeout. Plugins failed to respond to snapshot.") + case <-events.Emit(ApigeeSyncEventSelector, snapshot): + } + +} + +func extractTablesFromSnapshot(snapshot *common.Snapshot) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from snapshot") + if snapshot.Tables == nil { + //if this panic ever fires, it's a bug + log.Panicf("Attempt to extract known tables from snapshot without tables failed") + } + + for _, table := range snapshot.Tables { + tables[table.Name] = true + } + + return tables +} + +func extractTablesFromDB(db apid.DB) (tables map[string]bool) { + + tables = make(map[string]bool) + + log.Debug("Extracting table names from existing DB") + rows, err := db.Query("SELECT name FROM _known_tables;") + defer rows.Close() + + if err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + + for rows.Next() { + var table string + if err := rows.Scan(&table); err != nil { + log.Panicf("Error reading current set of tables: %v", err) + } + log.Debugf("Table %s found in existing db", table) + + tables[table] = true + } + return tables +} + +// Skip Downloading snapshot if there is already a snapshot available from previous run +func startOnLocalSnapshot(snapshot string) *common.Snapshot { + log.Infof("Starting on local snapshot: %s", snapshot) + + // ensure DB version will be accessible on behalf of dependant plugins + db, err := dataService.DBVersion(snapshot) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + + knownTables = extractTablesFromDB(db) + + // allow plugins (including this one) to start immediately on existing database + // Note: this MUST have no tables as that is used as an indicator + return &common.Snapshot{ + SnapshotInfo: snapshot, + } +} + +// will keep retrying with backoff until success +func downloadSnapshot(scopes []string, snapshot *common.Snapshot, quitPolling chan bool) { + + log.Debug("downloadSnapshot") + + snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI)) + if err != nil { + log.Panicf("bad url value for config %s: %s", snapshotUri, err) + } + + snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots") + + v := url.Values{} + for _, scope := range scopes { + v.Add("scope", scope) + } + snapshotUri.RawQuery = v.Encode() + uri := snapshotUri.String() + log.Infof("Snapshot Download: %s", uri) + + client := &http.Client{ + CheckRedirect: Redirect, + Timeout: httpTimeout, + } + + //pollWithBackoff only accepts function that accept a single quit channel + //to accomadate functions which need more parameters, wrap them in closures + attemptDownload := getAttemptDownloadClosure(client, snapshot, uri) + + pollWithBackoff(quitPolling, attemptDownload, handleSnapshotServerError) +} + +func getAttemptDownloadClosure(client *http.Client, snapshot *common.Snapshot, uri string) func(chan bool) error { + return func(_ chan bool) error { + req, err := http.NewRequest("GET", uri, nil) + if err != nil { + // should never happen, but if it does, it's unrecoverable anyway + log.Panicf("Snapshotserver comm error: %v", err) + } + addHeaders(req) + + var processSnapshotResponse func(*http.Response, *common.Snapshot) error + + // Set the transport protocol type based on conf file input + if config.GetString(configSnapshotProtocol) == "json" { + req.Header.Set("Accept", "application/json") + processSnapshotResponse = processSnapshotServerJsonResponse + } else if config.GetString(configSnapshotProtocol) == "sqlite" { + req.Header.Set("Accept", "application/transicator+sqlite") + processSnapshotResponse = processSnapshotServerFileResponse + } + + // Issue the request to the snapshot server + r, err := client.Do(req) + if err != nil { + log.Errorf("Snapshotserver comm error: %v", err) + return err + } + + defer r.Body.Close() + + if r.StatusCode != 200 { + body, _ := ioutil.ReadAll(r.Body) + log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body)) + return expected200Error{} + } + + // Decode the Snapshot server response + err = processSnapshotResponse(r, snapshot) + if err != nil { + log.Errorf("Response Data not parsable: %v", err) + return err + } + + return nil + } +} + +func persistKnownTablesToDB(tables map[string]bool, db apid.DB) { + log.Debugf("Inserting table names found in snapshot into db") + + tx, err := db.Begin() + if err != nil { + log.Panicf("Error starting transaction: %v", err) + } + defer tx.Rollback() + + _, err = tx.Exec("CREATE TABLE _known_tables (name text, PRIMARY KEY(name));") + if err != nil { + log.Panicf("Could not create _known_tables table: %s", err) + } + + for name := range tables { + log.Debugf("Inserting %s into _known_tables", name) + _, err := tx.Exec("INSERT INTO _known_tables VALUES(?);", name) + if err != nil { + log.Panicf("Error encountered inserting into known tables ", err) + } + + } + + err = tx.Commit() + if err != nil { + log.Panicf("Error committing transaction: %v", err) + + } +} + +func processSnapshotServerJsonResponse(r *http.Response, snapshot *common.Snapshot) error { + return json.NewDecoder(r.Body).Decode(snapshot) +} + +func processSnapshotServerFileResponse(r *http.Response, snapshot *common.Snapshot) error { + dbId := r.Header.Get("Transicator-Snapshot-TXID") + out, err := os.Create(data.DBPath(dbId)) + if err != nil { + return err + } + defer out.Close() + + //stream respose to DB + _, err = io.Copy(out, r.Body) + + if err != nil { + return err + } + + snapshot.SnapshotInfo = dbId + //TODO get timestamp from transicator. Not currently in response + + return nil +} + +func handleSnapshotServerError(err error) { + log.Debugf("Error connecting to snapshot server: %v", err) +}
diff --git a/token.go b/token.go index 60097c1..a6c118e 100644 --- a/token.go +++ b/token.go
@@ -7,13 +7,12 @@ "net/http" "net/url" "path" - "sync" + "sync/atomic" "time" ) var ( refreshFloatTime = time.Minute - getTokenLock sync.Mutex ) /* @@ -26,15 +25,34 @@ */ func createTokenManager() *tokenMan { - t := &tokenMan{} - t.doRefresh = make(chan bool, 1) - t.maintainToken() + isClosedInt := int32(0) + + t := &tokenMan{ + quitPollingForToken: make(chan bool, 1), + closed: make(chan bool), + getTokenChan: make(chan bool), + invalidateTokenChan: make(chan bool), + returnTokenChan: make(chan *oauthToken), + invalidateDone: make(chan bool), + isClosed: &isClosedInt, + } + + t.retrieveNewToken() + t.refreshTimer = time.After(t.token.refreshIn()) + go t.maintainToken() return t } type tokenMan struct { - token *oauthToken - doRefresh chan bool + token *oauthToken + isClosed *int32 + quitPollingForToken chan bool + closed chan bool + getTokenChan chan bool + invalidateTokenChan chan bool + refreshTimer <-chan time.Time + returnTokenChan chan *oauthToken + invalidateDone chan bool } func (t *tokenMan) getBearerToken() string { @@ -42,50 +60,63 @@ } func (t *tokenMan) maintainToken() { - go func() { - for { - if t.token.needsRefresh() { - getTokenLock.Lock() - t.retrieveNewToken() - getTokenLock.Unlock() - } - select { - case _, ok := <-t.doRefresh: - if !ok { - log.Debug("closed tokenMan") - return - } - log.Debug("force token refresh") - case <-time.After(t.token.refreshIn()): - log.Debug("auto refresh token") - } + for { + select { + case <-t.closed: + return + case <-t.refreshTimer: + log.Debug("auto refresh token") + t.retrieveNewToken() + t.refreshTimer = time.After(t.token.refreshIn()) + case <-t.getTokenChan: + token := t.token + t.returnTokenChan <- token + case <-t.invalidateTokenChan: + t.retrieveNewToken() + t.refreshTimer = time.After(t.token.refreshIn()) + t.invalidateDone <- true } - }() -} - -func (t *tokenMan) invalidateToken() { - log.Debug("invalidating token") - t.token = nil - t.doRefresh <- true + } } // will block until valid -func (t *tokenMan) getToken() *oauthToken { - getTokenLock.Lock() - defer getTokenLock.Unlock() - - if t.token.isValid() { - log.Debugf("returning existing token: %v", t.token) - return t.token +func (t *tokenMan) invalidateToken() { + //has been closed + if atomic.LoadInt32(t.isClosed) == int32(1) { + log.Debug("TokenManager: invalidateToken() called on closed tokenManager") + return } - - t.retrieveNewToken() - return t.token + log.Debug("invalidating token") + t.invalidateTokenChan <- true + <-t.invalidateDone } +func (t *tokenMan) getToken() *oauthToken { + //has been closed + if atomic.LoadInt32(t.isClosed) == int32(1) { + log.Debug("TokenManager: getToken() called on closed tokenManager") + return nil + } + t.getTokenChan <- true + return <-t.returnTokenChan +} + +/* + * blocking close() of tokenMan + */ + func (t *tokenMan) close() { + //has been closed + if atomic.SwapInt32(t.isClosed, 1) == int32(1) { + log.Panic("TokenManager: close() has been called before!") + return + } log.Debug("close token manager") - close(t.doRefresh) + t.quitPollingForToken <- true + // sending instead of closing, to make sure it enters the t.doRefresh branch + t.closed <- true + close(t.closed) + log.Debug("token manager closed") } // don't call externally. will block until success. @@ -99,18 +130,11 @@ } uri.Path = path.Join(uri.Path, "/accesstoken") - retryIn := 5 * time.Millisecond - maxBackOff := maxBackoffTimeout - backOffFunc := createBackOff(retryIn, maxBackOff) - first := true + pollWithBackoff(t.quitPollingForToken, t.getRetrieveNewTokenClosure(uri), func(err error) { log.Errorf("Error getting new token : ", err) }) +} - for { - if first { - first = false - } else { - backOffFunc() - } - +func (t *tokenMan) getRetrieveNewTokenClosure(uri *url.URL) func(chan bool) error { + return func(_ chan bool) error { form := url.Values{} form.Set("grant_type", "client_credentials") form.Add("client_id", config.GetString(configConsumerKey)) @@ -133,26 +157,26 @@ resp, err := client.Do(req) if err != nil { log.Errorf("Unable to Connect to Edge Proxy Server: %v", err) - continue + return err } body, err := ioutil.ReadAll(resp.Body) resp.Body.Close() if err != nil { log.Errorf("Unable to read EdgeProxy Sever response: %v", err) - continue + return err } if resp.StatusCode != 200 { log.Errorf("Oauth Request Failed with Resp Code: %d. Body: %s", resp.StatusCode, string(body)) - continue + return expected200Error{} } var token oauthToken err = json.Unmarshal(body, &token) if err != nil { log.Errorf("unable to unmarshal JSON response '%s': %v", string(body), err) - continue + return err } if token.ExpiresIn > 0 { @@ -166,12 +190,17 @@ if newInstanceID { newInstanceID = false - updateApidInstanceInfo() - } + err = updateApidInstanceInfo() + if err != nil { + log.Errorf("unable to unmarshal update apid instance info : %v", string(body), err) + return err + } + } t.token = &token config.Set(configBearerToken, token.AccessToken) - return + + return nil } }
diff --git a/token_test.go b/token_test.go index 5deec1c..49d1eb6 100644 --- a/token_test.go +++ b/token_test.go
@@ -1,5 +1,8 @@ package apidApigeeSync +/* + * Unit test of token manager + */ import ( "time" @@ -17,6 +20,8 @@ Context("oauthToken", func() { It("should calculate valid token", func() { + log.Info("Starting token tests...") + t := &oauthToken{ AccessToken: "x", ExpiresIn: 120000, @@ -28,6 +33,7 @@ }) It("should calculate expired token", func() { + t := &oauthToken{ AccessToken: "x", ExpiresIn: 0, @@ -39,6 +45,7 @@ }) It("should calculate token needing refresh", func() { + t := &oauthToken{ AccessToken: "x", ExpiresIn: 59000, @@ -50,6 +57,7 @@ }) It("should calculate on empty token", func() { + t := &oauthToken{} Expect(t.refreshIn().Seconds()).To(BeNumerically("<=", 0)) Expect(t.needsRefresh()).To(BeTrue()) @@ -60,31 +68,62 @@ Context("tokenMan", func() { It("should get a valid token", func() { - token := tokenManager.getToken() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + + res := oauthToken{ + AccessToken: "ABCD", + ExpiresIn: 1000, + } + body, err := json.Marshal(res) + Expect(err).NotTo(HaveOccurred()) + w.Write(body) + })) + config.Set(configProxyServerBaseURI, ts.URL) + testedTokenManager := createTokenManager() + token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) Expect(token.ExpiresIn > 0).To(BeTrue()) Expect(token.ExpiresAt).To(BeTemporally(">", time.Now())) - bToken := tokenManager.getBearerToken() + bToken := testedTokenManager.getBearerToken() Expect(bToken).To(Equal(token.AccessToken)) + testedTokenManager.close() + ts.Close() }) It("should refresh when forced to", func() { - token := tokenManager.getToken() + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + + res := oauthToken{ + AccessToken: generateUUID(), + ExpiresIn: 1000, + } + body, err := json.Marshal(res) + Expect(err).NotTo(HaveOccurred()) + w.Write(body) + })) + config.Set(configProxyServerBaseURI, ts.URL) + + testedTokenManager := createTokenManager() + token := testedTokenManager.getToken() Expect(token.AccessToken).ToNot(BeEmpty()) - tokenManager.invalidateToken() + testedTokenManager.invalidateToken() - token2 := tokenManager.getToken() + token2 := testedTokenManager.getToken() Expect(token).ToNot(Equal(token2)) Expect(token.AccessToken).ToNot(Equal(token2.AccessToken)) + testedTokenManager.close() + ts.Close() }) It("should refresh in refresh interval", func(done Done) { - finished := make(chan bool) - var tm *tokenMan + finished := make(chan bool, 1) start := time.Now() count := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -106,33 +145,27 @@ Expect(err).NotTo(HaveOccurred()) w.Write(body) })) - defer ts.Close() - tokenManager.getToken() - tokenManager.close() - oldBase := config.Get(configProxyServerBaseURI) config.Set(configProxyServerBaseURI, ts.URL) - oldFloat := refreshFloatTime - refreshFloatTime = 950 * time.Millisecond - defer func() { - tm.close() - config.Set(configProxyServerBaseURI, oldBase) - tokenManager = createTokenManager() - refreshFloatTime = oldFloat - }() + testedTokenManager := createTokenManager() - tm = createTokenManager() + testedTokenManager.getToken() + <-finished + + testedTokenManager.close() + ts.Close() + close(done) }) It("should have created_at_apid first time, update_at_apid after", func(done Done) { - - finished := make(chan bool) - var tm *tokenMan + finished := make(chan bool, 1) count := 0 + + newInstanceID = true + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - defer GinkgoRecover() count++ if count == 1 { @@ -147,28 +180,22 @@ } res := oauthToken{ AccessToken: string(count), - ExpiresIn: 2000, + ExpiresIn: 200000, } body, err := json.Marshal(res) Expect(err).NotTo(HaveOccurred()) w.Write(body) })) - defer ts.Close() - tokenManager.getToken() - tokenManager.close() - oldBase := config.Get(configProxyServerBaseURI) config.Set(configProxyServerBaseURI, ts.URL) - defer func() { - tm.close() - config.Set(configProxyServerBaseURI, oldBase) - tokenManager = createTokenManager() - }() + testedTokenManager := createTokenManager() - newInstanceID = true - tm = createTokenManager() - tm.invalidateToken() + testedTokenManager.getToken() + testedTokenManager.invalidateToken() + testedTokenManager.getToken() <-finished + testedTokenManager.close() + ts.Close() close(done) }) })