blob: a0e34cdc8839243efc5903f1e3804a8d73c73f2d [file] [log] [blame]
package apiGatewayDeploy
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
RESPONSE_STATUS_SUCCESS = "SUCCESS"
RESPONSE_STATUS_FAIL = "FAIL"
// todo: add error codes where this is used
ERROR_CODE_TODO = 0
)
var (
deploymentsChanged = make(chan string)
addSubscriber = make(chan chan string)
eTag int64
)
type errorResponse struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
}
type ApiDeployment struct {
ID string `json:"id"`
ScopeId string `json:"scopeId"`
Created string `json:"created"`
CreatedBy string `json:"createdBy"`
Updated string `json:"updated"`
UpdatedBy string `json:"updatedBy"`
ConfigJson json.RawMessage `json:"configuration"`
BundleConfigJson json.RawMessage `json:"bundleConfiguration"`
DisplayName string `json:"displayName"`
URI string `json:"uri"`
}
// sent to client
type ApiDeploymentResponse []ApiDeployment
type apiDeploymentResult struct {
ID string `json:"id"`
Status string `json:"status"`
ErrorCode int `json:"errorCode"`
Message string `json:"message"`
}
// received from client
type apiDeploymentResults []apiDeploymentResult
const deploymentsEndpoint = "/deployments"
func InitAPI() {
services.API().HandleFunc(deploymentsEndpoint, apiGetCurrentDeployments).Methods("GET")
services.API().HandleFunc(deploymentsEndpoint, apiSetDeploymentResults).Methods("PUT")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
w.WriteHeader(status)
e := errorResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
log.Debugf("sending %d error to client: %s", status, reason)
}
func writeDatabaseError(w http.ResponseWriter) {
writeError(w, http.StatusInternalServerError, ERROR_CODE_TODO, "database error")
}
func distributeEvents() {
subscribers := make(map[chan string]struct{})
mut := sync.Mutex{}
msg := ""
debouncer := func() {
select {
case <-time.After(debounceDuration):
mut.Lock()
subs := subscribers
subscribers = make(map[chan string]struct{})
m := msg
msg = ""
incrementETag()
mut.Unlock()
log.Debugf("Delivering deployment change %s to %d subscribers", m, len(subs))
for subscriber := range subs {
select {
case subscriber <- m:
log.Debugf("Handling deploy response for: %s", m)
log.Debugf("delivering TO: %v", subscriber)
default:
log.Debugf("listener too far behind, message dropped")
}
}
}
}
for {
select {
case newMsg := <-deploymentsChanged:
mut.Lock()
log.Debug("deploymentsChanged")
if msg == "" {
go debouncer()
}
msg = newMsg
mut.Unlock()
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %v", subscriber)
mut.Lock()
subscribers[subscriber] = struct{}{}
mut.Unlock()
}
}
}
func apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
// If returning If-None-Match value is equal to current deployment, status = 304
// If returning a new value, status = 200
// If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then
// block for up to the specified number of seconds until a new deployment becomes available.
b := r.URL.Query().Get("block")
var timeout int
if b != "" {
var err error
timeout, err = strconv.Atoi(b)
if err != nil {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "bad block value, must be number of seconds")
return
}
}
log.Debugf("api timeout: %d", timeout)
// If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
// query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
// most recent bundle list.
ifNoneMatch := r.Header.Get("If-None-Match")
log.Debugf("if-none-match: %s", ifNoneMatch)
// send unmodified if matches prior eTag and no timeout
eTag := getETag()
if eTag == ifNoneMatch && timeout == 0 {
w.WriteHeader(http.StatusNotModified)
return
}
// subscribe to new deployments in case we need it
var gotNewDeployment chan string
if timeout > 0 && ifNoneMatch != "" {
gotNewDeployment = make(chan string)
addSubscriber <- gotNewDeployment
}
deployments, err := getReadyDeployments()
if err != nil {
writeDatabaseError(w)
return
}
// send not found if no timeout
if len(deployments) == 0 && timeout == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
// send results if different eTag
if eTag != ifNoneMatch {
sendDeployments(w, deployments, eTag)
return
}
log.Debug("Blocking request... Waiting for new Deployments.")
select {
case <-gotNewDeployment:
apiGetCurrentDeployments(w, r) // recurse
case <-time.After(time.Duration(timeout) * time.Second):
log.Debug("Blocking deployment request timed out.")
if ifNoneMatch != "" {
w.WriteHeader(http.StatusNotModified)
} else {
w.WriteHeader(http.StatusNotFound)
}
}
}
func sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
var apiDeps ApiDeploymentResponse
for _, d := range dataDeps {
apiDeps = append(apiDeps, ApiDeployment{
ID: d.ID,
ScopeId: d.DataScopeID,
Created: d.Created,
CreatedBy: d.CreatedBy,
Updated: d.Updated,
UpdatedBy: d.UpdatedBy,
BundleConfigJson: []byte(d.BundleConfigJSON),
ConfigJson: []byte(d.ConfigJSON),
DisplayName: d.BundleName,
URI: d.LocalBundleURI,
})
}
b, err := json.Marshal(apiDeps)
if err != nil {
log.Errorf("unable to marshal deployments: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("sending deployment %s: %s", eTag, b)
w.Header().Set("ETag", eTag)
w.Write(b)
}
func apiSetDeploymentResults(w http.ResponseWriter, r *http.Request) {
var results apiDeploymentResults
buf, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(buf, &results)
if err != nil {
log.Errorf("Resp Handler Json Unmarshal err: ", err)
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "Malformed JSON")
return
}
// validate the results
// todo: these errors to the client should be standardized
var errs bytes.Buffer
var validResults apiDeploymentResults
for i, result := range results {
valid := true
if result.ID == "" {
errs.WriteString(fmt.Sprintf("Missing id at %d\n", i))
}
if result.Status != RESPONSE_STATUS_SUCCESS && result.Status != RESPONSE_STATUS_FAIL {
errs.WriteString(fmt.Sprintf("status must be '%s' or '%s' at %d\n",
RESPONSE_STATUS_SUCCESS, RESPONSE_STATUS_FAIL, i))
}
if result.Status == RESPONSE_STATUS_FAIL {
if result.ErrorCode == 0 {
errs.WriteString(fmt.Sprintf("errorCode is required for status == fail at %d\n", i))
}
if result.Message == "" {
errs.WriteString(fmt.Sprintf("message are required for status == fail at %d\n", i))
}
}
if valid {
validResults = append(validResults, result)
}
}
if errs.Len() > 0 {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, errs.String())
return
}
if len(validResults) > 0 {
go transmitDeploymentResultsToServer(validResults)
setDeploymentResults(validResults)
}
w.Write([]byte("OK"))
}
func addHeaders(req *http.Request) {
var token = services.Config().GetString("apigeesync_bearer_token")
req.Header.Add("Authorization", "Bearer "+token)
}
func transmitDeploymentResultsToServer(validResults apiDeploymentResults) error {
retryIn := bundleRetryDelay
maxBackOff := 5 * time.Minute
backOffFunc := createBackoff(retryIn, maxBackOff)
_, err := url.Parse(apiServerBaseURI.String())
if err != nil {
log.Errorf("unable to parse apiServerBaseURI %s: %v", apiServerBaseURI.String(), err)
return err
}
apiPath := fmt.Sprintf("%s/clusters/%s/apids/%s/deployments", apiServerBaseURI.String(), apidClusterID, apidInstanceID)
resultJSON, err := json.Marshal(validResults)
if err != nil {
log.Errorf("unable to marshal deployment results %v: %v", validResults, err)
return err
}
for {
log.Debugf("transmitting deployment results to tracker by URL=%s data=%s", apiPath, string(resultJSON))
req, err := http.NewRequest("PUT", apiPath, bytes.NewReader(resultJSON))
if err != nil {
log.Errorf("unable to create PUT request", err)
return err
}
req.Header.Add("Content-Type", "application/json")
addHeaders(req)
resp, err := http.DefaultClient.Do(req)
defer resp.Body.Close()
if err != nil || resp.StatusCode != http.StatusOK {
if err != nil {
log.Errorf("failed to communicate with tracking service: %v", err)
} else {
b, _ := ioutil.ReadAll(resp.Body)
log.Errorf("tracking service call failed to %s, code: %d, body: %s", apiPath, resp.StatusCode, string(b))
}
backOffFunc()
continue
}
return nil
}
}
// call whenever the list of deployments changes
func incrementETag() {
atomic.AddInt64(&eTag, 1)
}
func getETag() string {
e := atomic.LoadInt64(&eTag)
return strconv.FormatInt(e, 10)
}