| package apidApigeeSync |
| |
| import ( |
| "github.com/30x/apid-core" |
| "net/http" |
| "time" |
| ) |
| |
| const ( |
| httpTimeout = time.Minute |
| pluginTimeout = time.Minute |
| maxIdleConnsPerHost = 10 |
| ) |
| |
| var knownTables = make(map[string]bool) |
| |
| /* |
| * Start from existing snapshot if possible |
| * If an existing snapshot does not exist, use the apid scope to fetch |
| * all data scopes, then get a snapshot for those data scopes |
| * |
| * Then, poll for changes |
| */ |
| func bootstrap() { |
| |
| if apidInfo.LastSnapshot != "" { |
| snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) |
| |
| events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { |
| changeManager.pollChangeWithBackoff() |
| }) |
| |
| log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) |
| return |
| } |
| |
| snapManager.downloadBootSnapshot() |
| snapManager.downloadDataSnapshot() |
| |
| changeManager.pollChangeWithBackoff() |
| |
| } |
| |
| /* |
| * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy |
| * for retrying on errors |
| */ |
| func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) { |
| |
| backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true) |
| |
| //inintialize the retry channel to start first attempt immediately |
| retry := time.After(0 * time.Millisecond) |
| |
| for { |
| select { |
| case <-quit: |
| log.Info("Quit signal recieved. Returning") |
| return |
| case <-retry: |
| start := time.Now() |
| |
| err := toExecute(quit) |
| if err == nil { |
| return |
| } |
| |
| if _, ok := err.(quitSignalError); ok { |
| return |
| } |
| |
| end := time.Now() |
| //error encountered, since we would have returned above otherwise |
| handleError(err) |
| |
| /* TODO keep this around? Imagine an immediately erroring service, |
| * causing many sequential requests which could pollute logs |
| */ |
| //only backoff if the request took less than one second |
| if end.After(start.Add(time.Second)) { |
| backoff.Reset() |
| retry = time.After(0 * time.Millisecond) |
| } else { |
| retry = time.After(backoff.Duration()) |
| } |
| } |
| } |
| } |
| |
| func addHeaders(req *http.Request) { |
| req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) |
| req.Header.Set("apid_instance_id", apidInfo.InstanceID) |
| req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) |
| req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) |
| } |
| |
| type changeServerError struct { |
| Code string `json:"code"` |
| } |
| |
| type quitSignalError struct { |
| } |
| |
| type expected200Error struct { |
| } |
| |
| func (an expected200Error) Error() string { |
| return "Did not recieve OK response" |
| } |
| |
| func (a quitSignalError) Error() string { |
| return "Signal to quit encountered" |
| } |
| |
| func (a changeServerError) Error() string { |
| return a.Code |
| } |