| package apidApigeeSync | 
 |  | 
 | import ( | 
 | 	"github.com/30x/apid-core" | 
 | 	"net/http" | 
 | 	"time" | 
 | ) | 
 |  | 
 | const ( | 
 | 	httpTimeout         = time.Minute | 
 | 	pluginTimeout       = time.Minute | 
 | 	maxIdleConnsPerHost = 10 | 
 | ) | 
 |  | 
 | var knownTables = make(map[string]bool) | 
 |  | 
 | /* | 
 |  *  Start from existing snapshot if possible | 
 |  *  If an existing snapshot does not exist, use the apid scope to fetch | 
 |  *  all data scopes, then get a snapshot for those data scopes | 
 |  * | 
 |  *  Then, poll for changes | 
 |  */ | 
 | func bootstrap() { | 
 |  | 
 | 	if apidInfo.LastSnapshot != "" { | 
 | 		snapshot := startOnLocalSnapshot(apidInfo.LastSnapshot) | 
 |  | 
 | 		events.EmitWithCallback(ApigeeSyncEventSelector, snapshot, func(event apid.Event) { | 
 | 			changeManager.pollChangeWithBackoff() | 
 | 		}) | 
 |  | 
 | 		log.Infof("Started on local snapshot: %s", snapshot.SnapshotInfo) | 
 | 		return | 
 | 	} | 
 |  | 
 | 	snapManager.downloadBootSnapshot() | 
 | 	snapManager.downloadDataSnapshot() | 
 |  | 
 | 	changeManager.pollChangeWithBackoff() | 
 |  | 
 | } | 
 |  | 
 | /* | 
 |  * Call toExecute repeatedly until it does not return an error, with an exponential backoff policy | 
 |  * for retrying on errors | 
 |  */ | 
 | func pollWithBackoff(quit chan bool, toExecute func(chan bool) error, handleError func(error)) { | 
 |  | 
 | 	backoff := NewExponentialBackoff(200*time.Millisecond, config.GetDuration(configPollInterval), 2, true) | 
 |  | 
 | 	//inintialize the retry channel to start first attempt immediately | 
 | 	retry := time.After(0 * time.Millisecond) | 
 |  | 
 | 	for { | 
 | 		select { | 
 | 		case <-quit: | 
 | 			log.Info("Quit signal recieved.  Returning") | 
 | 			return | 
 | 		case <-retry: | 
 | 			start := time.Now() | 
 |  | 
 | 			err := toExecute(quit) | 
 | 			if err == nil { | 
 | 				return | 
 | 			} | 
 |  | 
 | 			if _, ok := err.(quitSignalError); ok { | 
 | 				return | 
 | 			} | 
 |  | 
 | 			end := time.Now() | 
 | 			//error encountered, since we would have returned above otherwise | 
 | 			handleError(err) | 
 |  | 
 | 			/* TODO keep this around? Imagine an immediately erroring service, | 
 | 			 *  causing many sequential requests which could pollute logs | 
 | 			 */ | 
 | 			//only backoff if the request took less than one second | 
 | 			if end.After(start.Add(time.Second)) { | 
 | 				backoff.Reset() | 
 | 				retry = time.After(0 * time.Millisecond) | 
 | 			} else { | 
 | 				retry = time.After(backoff.Duration()) | 
 | 			} | 
 | 		} | 
 | 	} | 
 | } | 
 |  | 
 | func addHeaders(req *http.Request) { | 
 | 	req.Header.Set("Authorization", "Bearer "+tokenManager.getBearerToken()) | 
 | 	req.Header.Set("apid_instance_id", apidInfo.InstanceID) | 
 | 	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID) | 
 | 	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339)) | 
 | } | 
 |  | 
 | type changeServerError struct { | 
 | 	Code string `json:"code"` | 
 | } | 
 |  | 
 | type quitSignalError struct { | 
 | } | 
 |  | 
 | type expected200Error struct { | 
 | } | 
 |  | 
 | func (an expected200Error) Error() string { | 
 | 	return "Did not recieve OK response" | 
 | } | 
 |  | 
 | func (a quitSignalError) Error() string { | 
 | 	return "Signal to quit encountered" | 
 | } | 
 |  | 
 | func (a changeServerError) Error() string { | 
 | 	return a.Code | 
 | } |