handle “snapshot too old” case from change server, a bunch of refactoring
diff --git a/apigeeSync_suite_test.go b/apigeeSync_suite_test.go index 960f0a4..4a9de78 100644 --- a/apigeeSync_suite_test.go +++ b/apigeeSync_suite_test.go
@@ -19,6 +19,7 @@ tmpDir string testServer *httptest.Server testRouter apid.Router + testMock *MockServer ) var _ = BeforeSuite(func(done Done) { @@ -45,6 +46,7 @@ config.Set(configConsumerKey, "XXXXXXX") config.Set(configConsumerSecret, "YYYYYYY") + block = "0" log = apid.Log() // set up mock server @@ -57,7 +59,7 @@ Organization: "att", Environment: "prod", } - Mock(mockParms, testRouter) + testMock = Mock(mockParms, testRouter) // This is actually the first test :) // Tests that entire bootstrap and set of sync operations work @@ -150,9 +152,6 @@ apid.Events().Close() token = "" - downloadDataSnapshot = false - downloadBootSnapshot = false - changeFinished = false lastSequence = "" _, err := getDB().Exec("DELETE FROM APID_CLUSTER")
diff --git a/apigee_sync.go b/apigee_sync.go index ec3031e..20eb9a5 100644 --- a/apigee_sync.go +++ b/apigee_sync.go
@@ -3,7 +3,6 @@ import ( "bytes" "encoding/json" - "errors" "io/ioutil" "net/http" "net/url" @@ -14,79 +13,63 @@ "github.com/apigee-labs/transicator/common" ) -var token string -var downloadDataSnapshot, downloadBootSnapshot, changeFinished bool -var lastSequence string +const ( + httpTimeout = time.Minute + pluginTimeout = time.Minute + maxBackoffTimeout = time.Minute +) -func addHeaders(req *http.Request) { - req.Header.Add("Authorization", "Bearer "+token) - req.Header.Set("apid_instance_id", apidInfo.InstanceID) - req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) - req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) -} - -func postPluginDataDelivery(e apid.Event) { - - if ede, ok := e.(apid.EventDeliveryEvent); ok { - - if ev, ok := ede.Event.(*common.ChangeList); ok { - if lastSequence != ev.LastSequence { - lastSequence = ev.LastSequence - err := updateLastSequence(lastSequence) - if err != nil { - log.Panic("Unable to update Sequence in DB") - } - } - changeFinished = true - - } else if _, ok := ede.Event.(*common.Snapshot); ok { - if downloadBootSnapshot == false { - downloadBootSnapshot = true - log.Debug("Updated bootstrap SnapshotInfo") - } else { - downloadDataSnapshot = true - log.Debug("Updated data SnapshotInfo") - } - } - } -} +var ( + block string = "45" + token string + lastSequence string + polling bool +) /* * Polls change agent for changes. In event of errors, uses a doubling * backoff from 200ms up to a max delay of the configPollInterval value. */ -func updatePeriodicChanges() { +func pollForChanges() { + + // ensure there's just one polling thread + if polling { + return + } + polling = true var backOffFunc func() pollInterval := config.GetDuration(configPollInterval) for { - start := time.Now().Second() + start := time.Now() err := pollChangeAgent() - end := time.Now().Second() + end := time.Now() if err != nil { + if _, ok := err.(apiError); ok { + downloadDataSnapshot() + continue + } log.Debugf("Error connecting to changeserver: %v", err) } - if end-start <= 1 { - if backOffFunc == nil { - backOffFunc = createBackOff(200*time.Millisecond, pollInterval) - } - backOffFunc() - } else { + if end.After(start.Add(time.Second)) { backOffFunc = nil + continue } + if backOffFunc == nil { + backOffFunc = createBackOff(200*time.Millisecond, pollInterval) + } + backOffFunc() } + + polling = false } /* * Long polls the change agent with a 45 second block. Parses the response from - * change agent and raises an event. + * change agent and raises an event. Called by pollForChanges(). */ func pollChangeAgent() error { - if downloadDataSnapshot != true { - log.Debug("Waiting for snapshot download to complete") - return errors.New("Snapshot download in progress...") - } changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI)) if err != nil { log.Errorf("bad url value for config %s: %s", changesUri, err) @@ -114,7 +97,7 @@ if lastSequence != "" { v.Add("since", lastSequence) } - v.Add("block", "45") + v.Add("block", block) /* * Include all the scopes associated with the config Id @@ -131,7 +114,7 @@ log.Debugf("Fetching changes: %s", uri) /* If error, break the loop, and retry after interval */ - client := &http.Client{Timeout: time.Minute} // must be greater than block value + client := &http.Client{Timeout: httpTimeout} // must be greater than block value req, err := http.NewRequest("GET", uri, nil) addHeaders(req) r, err := client.Do(req) @@ -140,20 +123,29 @@ return err } - /* If the call is not Authorized, update flag */ if r.StatusCode != http.StatusOK { - if r.StatusCode == http.StatusUnauthorized { + log.Errorf("Get changes request failed with status code: %d", r.StatusCode) + switch r.StatusCode { + case http.StatusUnauthorized: token = "" - log.Errorf("Token expired? Unauthorized request.") + case http.StatusNotModified: + continue + + case http.StatusBadRequest: + var apiErr apiError + err = json.NewDecoder(r.Body).Decode(&apiErr) + if err != nil { + log.Errorf("JSON Response Data not parsable: %v", err) + break + } + if apiErr.Code == "SNAPSHOT_TOO_OLD" { + log.Debug("Received SNAPSHOT_TOO_OLD message from change server.") + err = apiErr + } } + r.Body.Close() - if r.StatusCode != http.StatusNotModified { - err = errors.New("force backoff") - log.Errorf("Get changes request failed with status code: %d", r.StatusCode) - } else { - log.Infof("Get changes request timed out with %d", http.StatusNotModified) - } return err } @@ -167,34 +159,26 @@ /* If valid data present, Emit to plugins */ if len(resp.Changes) > 0 { - changeFinished = false - events.Emit(ApigeeSyncEventSelector, &resp) - /* - * The plugins should have finished what they are doing. - * Wait till they are done. - * If they take longer than expected - abort apid(?) - * (Should there be a configurable Fudge factor?) FIXME - */ - for count := 0; count < 1000; count++ { - if changeFinished == false { - log.Debug("Waiting for plugins to complete...") - time.Sleep(time.Duration(count) * 100 * time.Millisecond) - } else { - break - } - } - if changeFinished == false { - log.Panic("Never got ack from plugins. Investigate.") + done := make(chan bool) + events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) { + done <- true + }) + + select { + case <-time.After(httpTimeout): + log.Panic("Timeout. Plugins failed to respond to changes.") + case <-done: + close(done) } } else { log.Debugf("No Changes detected for Scopes: %s", scopes) + } - if lastSequence != resp.LastSequence { - lastSequence = resp.LastSequence - err := updateLastSequence(lastSequence) - if err != nil { - log.Panic("Unable to update Sequence in DB") - } + if lastSequence != resp.LastSequence { + lastSequence = resp.LastSequence + err := updateLastSequence(lastSequence) + if err != nil { + log.Panic("Unable to update Sequence in DB") } } } @@ -227,7 +211,7 @@ uri.Path = path.Join(uri.Path, "/accesstoken") retryIn := 5 * time.Millisecond - maxBackOff := 1 * time.Minute + maxBackOff := maxBackoffTimeout backOffFunc := createBackOff(retryIn, maxBackOff) first := true @@ -257,7 +241,7 @@ req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) } - client := &http.Client{Timeout: time.Minute} + client := &http.Client{Timeout: httpTimeout} resp, err := client.Do(req) if err != nil { log.Errorf("Unable to Connect to Edge Proxy Server: %v", err) @@ -323,77 +307,76 @@ return nil } -/* - * Method downloads the snapshot in a two phased manner. - * Phase 1: Use the apidConfigId as the bootstrap scope, and - * get the apid_config and apid_config_scope from the snapshot - * server. - * Phase 2: Get all the scopes fetches from phase 1, and issue - * the second call to the snapshot server to get all the data - * associated with the scope(s). - * Emit the data for the necessary plugins to process. - * If there is already previous data in sqlite, don't fetch - * again from snapshot server. - */ +// pollForChanges should usually be true, tests use the flag func bootstrap() { - // Skip Downloading snapshot if there is already a snapshot available from previous run of APID if apidInfo.LastSnapshot != "" { - - log.Infof("Starting on downloaded snapshot: %s", apidInfo.LastSnapshot) - - // ensure DB version will be accessible on behalf of dependant plugins - _, err := data.DBVersion(apidInfo.LastSnapshot) - if err != nil { - log.Panicf("Database inaccessible: %v", err) - } - - // allow plugins (including this one) to start immediately on existing database - snap := &common.Snapshot{ - SnapshotInfo: apidInfo.LastSnapshot, - } - events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) { - downloadBootSnapshot = true - downloadDataSnapshot = true - - go updatePeriodicChanges() - }) - + startOnLocalSnapshot(apidInfo.LastSnapshot) return } - /* Phase 1 */ - downloadSnapshot() - - /* - * Give some time for all the plugins to process the Downloaded - * Snapshot - */ - for count := 0; count < 60; count++ { - if !downloadBootSnapshot { - log.Debug("Waiting for bootscope snapshot download...") - time.Sleep(time.Duration(count) * 100 * time.Millisecond) - } else { - break - } - } - - /* Phase 2 */ - if downloadBootSnapshot && downloadDataSnapshot { - log.Debug("Proceeding with existing Sqlite data") - } else if downloadBootSnapshot == true { - log.Debug("Proceed to download Snapshot for data scopes") - downloadSnapshot() - } else { - log.Panic("Snapshot for bootscope failed") - } - - go updatePeriodicChanges() + downloadBootSnapshot() + downloadDataSnapshot() + go pollForChanges() } -func downloadSnapshot() { +// retrieve boot information: apid_config and apid_config_scope +func downloadBootSnapshot() { + log.Debug("download Snapshot for boot data") - log.Debugf("downloadSnapshot") + scopes := []string{apidInfo.ClusterID} + downloadSnapshot(scopes) + // note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot +} + +// use the scope IDs from the boot snapshot to get all the data associated with the scopes +func downloadDataSnapshot() { + log.Debug("download Snapshot for data scopes") + + var scopes = findScopesForId(apidInfo.ClusterID) + scopes = append(scopes, apidInfo.ClusterID) + resp := downloadSnapshot(scopes) + + done := make(chan bool) + log.Info("Emitting Snapshot to plugins") + events.EmitWithCallback(ApigeeSyncEventSelector, &resp, func(event apid.Event) { + done <- true + }) + + select { + case <-time.After(pluginTimeout): + log.Panic("Timeout. Plugins failed to respond to snapshot.") + case <-done: + close(done) + } +} + +// Skip Downloading snapshot if there is already a snapshot available from previous run +func startOnLocalSnapshot(snapshot string) { + log.Infof("Starting on local snapshot: %s", snapshot) + + // ensure DB version will be accessible on behalf of dependant plugins + _, err := data.DBVersion(snapshot) + if err != nil { + log.Panicf("Database inaccessible: %v", err) + } + + // allow plugins (including this one) to start immediately on existing database + // Note: this MUST have no tables as that is used as an indicator + snap := &common.Snapshot{ + SnapshotInfo: apidInfo.LastSnapshot, + } + events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) { + go pollForChanges() + }) + + log.Infof("Started on local snapshot: %s", snapshot) +} + +// will keep retrying with backoff until success +func downloadSnapshot(scopes []string) common.Snapshot { + + log.Debug("downloadSnapshot") snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI)) if err != nil { @@ -404,14 +387,6 @@ getBearerToken() // todo: this could expire... ensure it's called again as needed - var scopes []string - if downloadBootSnapshot { - scopes = findScopesForId(apidInfo.ClusterID) - } - - // always include boot cluster - scopes = append(scopes, apidInfo.ClusterID) - /* Frame and send the snapshot request */ snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots") @@ -425,11 +400,11 @@ client := &http.Client{ CheckRedirect: Redirect, - Timeout: time.Minute, + Timeout: httpTimeout, } retryIn := 5 * time.Millisecond - maxBackOff := 1 * time.Minute + maxBackOff := maxBackoffTimeout backOffFunc := createBackOff(retryIn, maxBackOff) first := true @@ -462,32 +437,36 @@ } if r.StatusCode != 200 { - log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode) + log.Errorf("Snapshot server conn failed with resp code %d", r.StatusCode) + r.Body.Close() continue } // Decode the Snapshot server response var resp common.Snapshot err = json.NewDecoder(r.Body).Decode(&resp) - r.Body.Close() if err != nil { - if downloadBootSnapshot { - /* - * If the data set is empty, allow it to proceed, as change server - * will feed data. Since Bootstrapping has passed, it has the - * Bootstrap config id to function. - */ - downloadDataSnapshot = true - return - } else { - log.Errorf("JSON Response Data not parsable: %v", err) - continue - } + log.Errorf("JSON Response Data not parsable: %v", err) + r.Body.Close() + continue } - log.Info("Emitting Snapshot to plugins") - events.Emit(ApigeeSyncEventSelector, &resp) - - break + r.Body.Close() + return resp } } + +func addHeaders(req *http.Request) { + req.Header.Add("Authorization", "Bearer "+token) + req.Header.Set("apid_instance_id", apidInfo.InstanceID) + req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) + req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) +} + +type apiError struct { + Code string `json:"code"` +} + +func (a apiError) Error() string { + return a.Code +}
diff --git a/apigee_sync_test.go b/apigee_sync_test.go index d6520be..2150b54 100644 --- a/apigee_sync_test.go +++ b/apigee_sync_test.go
@@ -26,4 +26,17 @@ bootstrap() }) + + It("should process a new snapshot when change server requires it", func(done Done) { + oldSnap := apidInfo.LastSnapshot + apid.Events().ListenFunc(ApigeeSyncEventSelector, func(event apid.Event) { + defer GinkgoRecover() + + if s, ok := event.(*common.Snapshot); ok { + Expect(s.SnapshotInfo).NotTo(Equal(oldSnap)) + close(done) + } + }) + testMock.forceNewSnapshot() + }) })
diff --git a/init.go b/init.go index 5f3c6cb..6078147 100644 --- a/init.go +++ b/init.go
@@ -5,8 +5,9 @@ "fmt" "os" - "github.com/30x/apid-core" "time" + + "github.com/30x/apid-core" ) const ( @@ -23,8 +24,7 @@ // special value - set by ApigeeSync, not taken from configuration configApidInstanceID = "apigeesync_apid_instance_id" - // This will not be needed once we have plugin - // handling tokens. + // This will not be needed once we have plugin handling tokens. bearerToken = "apigeesync_bearer_token" ) @@ -52,7 +52,7 @@ } func initDefaults() { - config.SetDefault(configPollInterval, 120 * time.Second) + config.SetDefault(configPollInterval, 120*time.Second) config.SetDefault(configSnapshotProtocol, "json") name, errh := os.Hostname() if (errh != nil) && (len(config.GetString(configName)) == 0) { @@ -82,10 +82,7 @@ * downloadSnapshots/changes etc have to begin to be processed only * after all the plugins are initialized */ - events.ListenFunc(apid.SystemEventsSelector, postInitPlugins) - - // This callback function will get called after each data event delivery. - events.ListenFunc(apid.EventDeliveredSelector, postPluginDataDelivery) + events.ListenOnceFunc(apid.SystemEventsSelector, postInitPlugins) // check for required values for _, key := range []string{configProxyServerBaseURI, configConsumerKey, configConsumerSecret,
diff --git a/listener_test.go b/listener_test.go index 93a9588..1d1e896 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -69,15 +69,15 @@ Name: LISTENER_TABLE_DATA_SCOPE, Rows: []common.Row{ { - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, + "id": &common.ColumnVal{Value: "i"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, }, }, }, @@ -202,15 +202,15 @@ Operation: common.Insert, Table: LISTENER_TABLE_DATA_SCOPE, NewRow: common.Row{ - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, + "id": &common.ColumnVal{Value: "i"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, }, }, }, @@ -257,15 +257,15 @@ Operation: common.Insert, Table: LISTENER_TABLE_DATA_SCOPE, NewRow: common.Row{ - "id": &common.ColumnVal{Value: "i"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, + "id": &common.ColumnVal{Value: "i"}, + "apid_cluster_id": &common.ColumnVal{Value: "a"}, + "scope": &common.ColumnVal{Value: "s"}, + "org": &common.ColumnVal{Value: "o"}, + "env": &common.ColumnVal{Value: "e"}, + "created": &common.ColumnVal{Value: "c"}, + "created_by": &common.ColumnVal{Value: "c"}, + "updated": &common.ColumnVal{Value: "u"}, + "updated_by": &common.ColumnVal{Value: "u"}, }, }, },
diff --git a/mock_server.go b/mock_server.go index 53f6c2c..23648e0 100644 --- a/mock_server.go +++ b/mock_server.go
@@ -75,6 +75,11 @@ deployIDMutex sync.RWMutex minDeploymentID *int64 maxDeploymentID *int64 + newSnap *int32 +} + +func (m *MockServer) forceNewSnapshot() { + atomic.SwapInt32(m.newSnap, 1) } func (m *MockServer) lastSequenceID() string { @@ -115,6 +120,7 @@ m.minDeploymentID = new(int64) *m.minDeploymentID = 1 m.maxDeploymentID = new(int64) + m.newSnap = new(int32) go m.developerGenerator() go m.developerUpdater() @@ -288,7 +294,7 @@ body, err := json.Marshal(snapshot) Expect(err).NotTo(HaveOccurred()) - log.Info("sending snapshot") + log.Infof("sending snapshot: %s", m.snapshotID) if len(body) < 10000 { log.Debugf("snapshot: %#v", string(body)) } @@ -300,14 +306,27 @@ defer GinkgoRecover() m.registerFailHandler(w) + val := atomic.SwapInt32(m.newSnap, 0) + if val > 0 { + w.WriteHeader(http.StatusBadRequest) + apiErr := apiError{ + Code: "SNAPSHOT_TOO_OLD", + } + bytes, err := json.Marshal(apiErr) + Expect(err).NotTo(HaveOccurred()) + w.Write(bytes) + return + } + q := req.URL.Query() + scopes := q["scope"] - block, err := strconv.Atoi(req.URL.Query().Get("block")) + block, err := strconv.Atoi(q.Get("block")) Expect(err).NotTo(HaveOccurred()) - since := req.URL.Query().Get("since") + since := q.Get("since") Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID)) - Expect(q.Get("snapshot")).To(Equal(m.snapshotID)) + //Expect(q.Get("snapshot")).To(Equal(m.snapshotID)) Expect(scopes).To(ContainElement(m.params.ClusterID)) //Expect(scopes).To(ContainElement(m.params.Scope))