package apidApigeeSync

import (
	"bytes"
	"encoding/json"
	"errors"
	"io/ioutil"
	"net/http"
	"net/url"
	"path"
	"time"

	"github.com/30x/apid-core"
	"github.com/apigee-labs/transicator/common"
)

var token string
var downloadDataSnapshot, downloadBootSnapshot, changeFinished bool
var lastSequence string

func addHeaders(req *http.Request) {
	req.Header.Add("Authorization", "Bearer "+token)
	req.Header.Set("apid_instance_id", apidInfo.InstanceID)
	req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
	req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
}

func postPluginDataDelivery(e apid.Event) {

	if ede, ok := e.(apid.EventDeliveryEvent); ok {

		if ev, ok := ede.Event.(*common.ChangeList); ok {
			if lastSequence != ev.LastSequence {
				lastSequence = ev.LastSequence
				err := updateLastSequence(lastSequence)
				if err != nil {
					log.Panic("Unable to update Sequence in DB")
				}
			}
			changeFinished = true

		} else if _, ok := ede.Event.(*common.Snapshot); ok {
			if downloadBootSnapshot == false {
				downloadBootSnapshot = true
				log.Debug("Updated bootstrap SnapshotInfo")
			} else {
				downloadDataSnapshot = true
				log.Debug("Updated data SnapshotInfo")
			}
		}
	}
}

/*
 * Helper function that sleeps for N seconds if comm with change agent
 * fails. The retry interval gradually is incremented each time it fails
 * till it reaches the Polling Int time, and after which it constantly
 * retries at the polling time interval
 */
func updatePeriodicChanges() {

	times := 1
	pollInterval := config.GetInt(configPollInterval)
	for {
		startTime := time.Second
		err := pollChangeAgent()
		if err != nil {
			log.Debugf("Error connecting to changeserver: %v", err)
		}
		endTime := time.Second
		// Gradually increase retry interval, and max at some level
		if endTime-startTime <= 1 {
			if times < pollInterval {
				times++
			} else {
				times = pollInterval
			}
			log.Debugf("Connecting to changeserver...")
			time.Sleep(time.Duration(times) * 200 * time.Millisecond)
		} else {
			// Reset sleep interval
			times = 1
		}

	}
}

/*
 * Long polls every 45 seconds the change agent. Parses the response from
 * change agent and raises an event.
 */
func pollChangeAgent() error {

	if downloadDataSnapshot != true {
		log.Debug("Waiting for snapshot download to complete")
		return errors.New("Snapshot download in progress...")
	}
	changesUri, err := url.Parse(config.GetString(configChangeServerBaseURI))
	if err != nil {
		log.Errorf("bad url value for config %s: %s", changesUri, err)
		return err
	}
	changesUri.Path = path.Join(changesUri.Path, "changes")

	/*
	 * Check to see if we have lastSequence already saved in the DB,
	 * in which case, it has to be used to prevent re-reading same data
	 */
	lastSequence = getLastSequence()
	for {
		log.Debug("polling...")
		if token == "" {
			// invalid token, loop until we get one
			getBearerToken()
		}

		/* Find the scopes associated with the config id */
		scopes := findScopesForId(apidInfo.ClusterID)
		v := url.Values{}

		/* Sequence added to the query if available */
		if lastSequence != "" {
			v.Add("since", lastSequence)
		}
		v.Add("block", "45")

		/*
		 * Include all the scopes associated with the config Id
		 * The Config Id is included as well, as it acts as the
		 * Bootstrap scope
		 */
		for _, scope := range scopes {
			v.Add("scope", scope)
		}
		v.Add("scope", apidInfo.ClusterID)
		v.Add("snapshot", apidInfo.LastSnapshot)
		changesUri.RawQuery = v.Encode()
		uri := changesUri.String()
		log.Debugf("Fetching changes: %s", uri)

		/* If error, break the loop, and retry after interval */
		client := &http.Client{}
		req, err := http.NewRequest("GET", uri, nil)
		addHeaders(req)
		r, err := client.Do(req)
		if err != nil {
			log.Errorf("change agent comm error: %s", err)
			return err
		}

		/* If the call is not Authorized, update flag */
		if r.StatusCode != http.StatusOK {
			if r.StatusCode == http.StatusUnauthorized {
				token = ""
				log.Errorf("Token expired? Unauthorized request.")
			}
			r.Body.Close()
			if r.StatusCode != http.StatusNotModified {
				log.Errorf("Get changes request failed with Resp err: %d", r.StatusCode)
			} else {
				log.Infof("Get changes request timed out with %d", http.StatusNotModified)
			}
			return err
		}

		var resp common.ChangeList
		err = json.NewDecoder(r.Body).Decode(&resp)
		r.Body.Close()
		if err != nil {
			log.Errorf("JSON Response Data not parsable: %v", err)
			return err
		}

		/* If valid data present, Emit to plugins */
		if len(resp.Changes) > 0 {
			changeFinished = false
			events.Emit(ApigeeSyncEventSelector, &resp)
			/*
			 * The plugins should have finished what they are doing.
			 * Wait till they are done.
			 * If they take longer than expected - abort apid(?)
			 * (Should there be a configurable Fudge factor?) FIXME
			 */
			for count := 0; count < 1000; count++ {
				if changeFinished == false {
					log.Debug("Waiting for plugins to complete...")
					time.Sleep(time.Duration(count) * 100 * time.Millisecond)
				} else {
					break
				}
			}
			if changeFinished == false {
				log.Panic("Never got ack from plugins. Investigate.")
			}
		} else {
			log.Debugf("No Changes detected for Scopes: %s", scopes)

			if lastSequence != resp.LastSequence {
				lastSequence = resp.LastSequence
				err := updateLastSequence(lastSequence)
				if err != nil {
					log.Panic("Unable to update Sequence in DB")
				}
			}
		}
	}
}

// simple doubling back-off
func createBackOff(retryIn, maxBackOff time.Duration) func() {
	return func() {
		log.Debugf("backoff called. will retry in %s.", retryIn)
		time.Sleep(retryIn)
		retryIn = retryIn * time.Duration(2)
		if retryIn > maxBackOff {
			retryIn = maxBackOff
		}
	}
}

/*
 * This function will (for now) use the Access Key/Secret Key/ApidConfig Id
 * to get the bearer token, and the scopes (as comma separated scope)
 */
func getBearerToken() {

	log.Info("Getting a Bearer token...")
	uriString := config.GetString(configProxyServerBaseURI)
	uri, err := url.Parse(uriString)
	if err != nil {
		log.Panicf("unable to parse uri config '%s' value: '%s': %v", configProxyServerBaseURI, uriString, err)
	}
	uri.Path = path.Join(uri.Path, "/accesstoken")

	retryIn := 5 * time.Millisecond
	maxBackOff := 1 * time.Minute
	backOffFunc := createBackOff(retryIn, maxBackOff)
	first := true

	for {
		if first {
			first = false
		} else {
			backOffFunc()
		}

		token = ""
		form := url.Values{}
		form.Set("grant_type", "client_credentials")
		form.Add("client_id", config.GetString(configConsumerKey))
		form.Add("client_secret", config.GetString(configConsumerSecret))
		req, err := http.NewRequest("POST", uri.String(), bytes.NewBufferString(form.Encode()))
		req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
		req.Header.Set("display_name", apidInfo.InstanceName)
		req.Header.Set("apid_instance_id", apidInfo.InstanceID)
		req.Header.Set("apid_cluster_Id", apidInfo.ClusterID)
		req.Header.Set("status", "ONLINE")
		req.Header.Set("plugin_details", apidPluginDetails)

		if newInstanceID {
			req.Header.Set("created_at_apid", time.Now().Format(time.RFC3339))
		} else {
			req.Header.Set("updated_at_apid", time.Now().Format(time.RFC3339))
		}

		client := &http.Client{}
		resp, err := client.Do(req)
		if err != nil {
			log.Errorf("Unable to Connect to Edge Proxy Server: %v", err)
			continue
		}
		defer resp.Body.Close()
		if resp.StatusCode != 200 {
			log.Errorf("Oauth Request Failed with Resp Code: %v", resp.StatusCode)
			continue
		}
		body, err := ioutil.ReadAll(resp.Body)
		if err != nil {
			log.Errorf("Unable to read EdgeProxy Sever response: %v", err)
			continue
		}

		var oauthResp oauthTokenResp
		log.Debugf("Response: %s ", body)
		err = json.Unmarshal(body, &oauthResp)
		if err != nil {
			log.Error("unable to unmarshal JSON response %s: %v", string(body), err)
			continue
		}
		token = oauthResp.AccessToken

		if newInstanceID {
			newInstanceID = false
			updateApidInstanceInfo()
		}

		/*
		 * This stores the bearer token for any other plugin to
		 * consume.
		 */
		config.Set(bearerToken, token)

		log.Debug("Got a new Bearer token.")

		return
	}
}

type oauthTokenResp struct {
	IssuedAt       int64    `json:"issuedAt"`
	AppName        string   `json:"applicationName"`
	Scope          string   `json:"scope"`
	Status         string   `json:"status"`
	ApiProdList    []string `json:"apiProductList"`
	ExpiresIn      int64    `json:"expiresIn"`
	DeveloperEmail string   `json:"developerEmail"`
	TokenType      string   `json:"tokenType"`
	ClientId       string   `json:"clientId"`
	AccessToken    string   `json:"accessToken"`
	TokenExpIn     int64    `json:"refreshTokenExpiresIn"`
	RefreshCount   int64    `json:"refreshCount"`
}

func Redirect(req *http.Request, via []*http.Request) error {
	req.Header.Add("Authorization", "Bearer "+token)
	req.Header.Add("org", apidInfo.ClusterID) // todo: this is strange.. is it needed?
	return nil
}

/*
 * Method downloads the snapshot in a two phased manner.
 * Phase 1: Use the apidConfigId as the bootstrap scope, and
 * get the apid_config and apid_config_scope from the snapshot
 * server.
 * Phase 2: Get all the scopes fetches from phase 1, and issue
 * the second call to the snapshot server to get all the data
 * associated with the scope(s).
 * Emit the data for the necessary plugins to process.
 * If there is already previous data in sqlite, don't fetch
 * again from snapshot server.
 */
func bootstrap() {

	// Skip Downloading snapshot if there is already a snapshot available from previous run of APID
	if apidInfo.LastSnapshot != "" {

		log.Infof("Starting on downloaded snapshot: %s", apidInfo.LastSnapshot)

		// ensure DB version will be accessible on behalf of dependant plugins
		_, err := data.DBVersion(apidInfo.LastSnapshot)
		if err != nil {
			log.Panicf("Database inaccessible: %v", err)
		}

		// allow plugins (including this one) to start immediately on existing database
		snap := &common.Snapshot{
			SnapshotInfo: apidInfo.LastSnapshot,
		}
		events.EmitWithCallback(ApigeeSyncEventSelector, snap, func(event apid.Event) {
			downloadBootSnapshot = true
			downloadDataSnapshot = true

			go updatePeriodicChanges()
		})

		return
	}

	/* Phase 1 */
	downloadSnapshot()

	/*
	 * Give some time for all the plugins to process the Downloaded
	 * Snapshot
	 */
	for count := 0; count < 60; count++ {
		if !downloadBootSnapshot {
			log.Debug("Waiting for bootscope snapshot download...")
			time.Sleep(time.Duration(count) * 100 * time.Millisecond)
		} else {
			break
		}
	}

	/* Phase 2 */
	if downloadBootSnapshot && downloadDataSnapshot {
		log.Debug("Proceeding with existing Sqlite data")
	} else if downloadBootSnapshot == true {
		log.Debug("Proceed to download Snapshot for data scopes")
		downloadSnapshot()
	} else {
		log.Panic("Snapshot for bootscope failed")
	}

	go updatePeriodicChanges()
}

func downloadSnapshot() {

	log.Debugf("downloadSnapshot")

	snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
	if err != nil {
		log.Panicf("bad url value for config %s: %s", snapshotUri, err)
	}

	// getBearerToken loops until good
	getBearerToken()
	// todo: this could expire... ensure it's called again as needed

	var scopes []string
	if downloadBootSnapshot {
		scopes = findScopesForId(apidInfo.ClusterID)
	}

	// always include boot cluster
	scopes = append(scopes, apidInfo.ClusterID)

	/* Frame and send the snapshot request */
	snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")

	v := url.Values{}
	for _, scope := range scopes {
		v.Add("scope", scope)
	}
	snapshotUri.RawQuery = v.Encode()
	uri := snapshotUri.String()
	log.Infof("Snapshot Download: %s", uri)

	client := &http.Client{
		CheckRedirect: Redirect,
	}

	retryIn := 5 * time.Millisecond
	maxBackOff := 1 * time.Minute
	backOffFunc := createBackOff(retryIn, maxBackOff)
	first := true

	for {
		if first {
			first = false
		} else {
			backOffFunc()
		}

		req, err := http.NewRequest("GET", uri, nil)
		if err != nil {
			// should never happen, but if it does, it's unrecoverable anyway
			log.Panicf("Snapshotserver comm error: %v", err)
		}
		addHeaders(req)

		// Set the transport protocol type based on conf file input
		if config.GetString(configSnapshotProtocol) == "json" {
			req.Header.Set("Accept", "application/json")
		} else {
			req.Header.Set("Accept", "application/proto")
		}

		// Issue the request to the snapshot server
		r, err := client.Do(req)
		if err != nil {
			log.Errorf("Snapshotserver comm error: %v", err)
			continue
		}

		// Decode the Snapshot server response
		var resp common.Snapshot
		err = json.NewDecoder(r.Body).Decode(&resp)
		r.Body.Close()
		if err != nil {
			if downloadBootSnapshot {
				/*
				 * If the data set is empty, allow it to proceed, as change server
				 * will feed data. Since Bootstrapping has passed, it has the
				 * Bootstrap config id to function.
				 */
				downloadDataSnapshot = true
				return
			} else {
				log.Errorf("JSON Response Data not parsable: %v", err)
				continue
			}
		}

		if r.StatusCode != 200 {
			log.Errorf("Snapshot server conn failed. HTTP Resp code %d", r.StatusCode)
			continue
		}

		log.Info("Emitting Snapshot to plugins")
		events.Emit(ApigeeSyncEventSelector, &resp)

		break
	}
}
