blob: 11f496df66ef32fe58aae5e8cb93cbf9d93d02ec [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"github.com/apid/apid-core/util"
"github.com/apigee-labs/transicator/common"
"github.com/gorilla/mux"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"time"
)
// todo: the full set of states should probably be RECEIVED, READY, FAIL, SUCCESS
const (
RESPONSE_STATUS_SUCCESS = "SUCCESS"
RESPONSE_STATUS_FAIL = "FAIL"
)
const (
TRACKER_ERR_BUNDLE_DOWNLOAD_TIMEOUT = iota + 1
)
const (
configEndpoint = "/configurations"
blobEndpointPath = "/blobs"
blobEndpoint = blobEndpointPath + "/{blobId}"
configIdEndpoint = configEndpoint + "/{configId}"
)
const (
API_ERR_BAD_BLOCK = iota + 1
API_ERR_INTERNAL
API_ERR_BAD_CONFIG_ID
API_ERR_NOT_FOUND
)
const (
sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
iso8601 = "2006-01-02T15:04:05.999Z07:00"
sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00"
changeTimeFormat = "2006-01-02 15:04:05.999"
)
const (
kindCollection = "Collection"
)
const (
headerSteam = "application/octet-stream"
headerJson = "application/json"
apidConfigIndexPar = "apid-config-index"
apidConfigIndexHeader = "x-apid-config-index"
)
var (
ErrNoLSN = errors.New("No last sequence in DB")
ErrInvalidLSN = errors.New(apidConfigIndexPar + " is invalid")
)
type deploymentsResult struct {
deployments []Configuration
err error
eTag string
}
type errorResponse struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
}
type ApiConfigurationDetails struct {
Self string `json:"self"`
Name string `json:"name"`
Type string `json:"type"`
Revision string `json:"revision"`
BeanBlobUrl string `json:"beanBlob"`
Org string `json:"orgId"`
Env string `json:"envId"`
ResourceBlobUrl string `json:"resourceBlob"`
Path string `json:"path"`
Created string `json:"created"`
Updated string `json:"updated"`
}
type ApiConfigurationResponse struct {
Kind string `json:"kind"`
Self string `json:"self"`
ApiConfigurationsResponse []ApiConfigurationDetails `json:"contents"`
}
type confChangeNotification struct {
LSN string
confs []Configuration
err error
}
type apiManagerInterface interface {
// an idempotent method to initialize api endpoints
InitAPI()
notifyNewChange()
}
type apiManager struct {
dbMan dbManagerInterface
configurationEndpoint string
blobEndpoint string
configurationIdEndpoint string
addSubscriber chan chan interface{}
newChangeListChan chan interface{}
apiInitialized bool
}
func (a *apiManager) InitAPI() {
if a.apiInitialized {
return
}
services.API().HandleFunc(a.configurationEndpoint, a.apiGetCurrentConfigs).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
services.API().HandleFunc(a.configurationIdEndpoint, a.apiHandleConfigId).Methods("GET")
a.initDistributeEvents()
a.apiInitialized = true
log.Debug("API endpoints initialized")
}
func (a *apiManager) initDistributeEvents() {
go util.DistributeEvents(a.newChangeListChan, a.addSubscriber)
}
func (a *apiManager) notifyNewChange() {
confs, err := a.dbMan.getAllConfigurations("")
if err != nil {
log.Errorf("Database error in getReadyConfigurations: %v", err)
}
a.newChangeListChan <- &confChangeNotification{
LSN: a.dbMan.getLSN(),
confs: confs,
err: err,
}
}
func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) {
w.WriteHeader(status)
e := errorResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
log.Debugf("sending %d error to client: %s", status, reason)
}
func (a *apiManager) writeInternalError(w http.ResponseWriter, err string) {
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
}
func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
blobId := vars["blobId"]
fs, err := a.dbMan.getLocalFSLocation(blobId)
if err != nil {
if err == sql.ErrNoRows {
a.writeError(w, http.StatusNotFound, API_ERR_NOT_FOUND, "cannot find the blob")
} else {
log.Errorf("apiReturnBlobData error from db: %v", err)
a.writeInternalError(w, "BlobId "+blobId+" has no mapping blob file")
}
return
}
byte, err := ioutil.ReadFile(fs)
if err != nil {
log.Errorf("apiReturnBlobData error read file: %v, %v", fs, err)
a.writeInternalError(w, err.Error())
return
}
_, err = io.Copy(w, bytes.NewReader(byte))
if err != nil {
log.Errorf("apiReturnBlobData error copy file: %v, %v", fs, err)
a.writeInternalError(w, err.Error())
}
w.Header().Set("Content-Type", headerSteam)
}
func (a *apiManager) apiHandleConfigId(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
configId := vars["configId"]
config, err := a.dbMan.getConfigById(configId)
if err != nil {
if err == sql.ErrNoRows {
a.writeError(w, http.StatusNotFound, API_ERR_NOT_FOUND, "cannot find the configuration")
} else {
log.Errorf("apiHandleConfigId: %v", err)
a.writeInternalError(w, err.Error())
}
return
}
configDetail := ApiConfigurationDetails{
Self: getHttpHost() + a.configurationEndpoint + "/" + config.ID,
Name: config.Name,
Type: config.Type,
Revision: config.Revision,
BeanBlobUrl: getBlobUrl(config.BlobID),
Org: config.OrgID,
Env: config.EnvID,
ResourceBlobUrl: getBlobUrl(config.BlobResourceID),
Path: config.Path,
Created: convertTime(config.Created),
Updated: convertTime(config.Updated),
}
b, err := json.Marshal(configDetail)
if err != nil {
log.Errorf("unable to marshal config: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
log.Debugf("sending configuration %s", b)
w.Header().Set("Content-Type", headerJson)
w.Write(b)
}
// If not long-polling, return configurations, status = 200
// If "apid-config-index" is given in request parameters, return immediately with status = 200/304
// If both "block" and "apid-config-index" are given:
// if apid's LSN > apid-config-index in header, return immediately with status = 200
// if apid's LSN <= apid-config-index, long polling for timeout=block secs
func (a *apiManager) apiGetCurrentConfigs(w http.ResponseWriter, r *http.Request) {
blockSec := r.URL.Query().Get("block")
typeFilter := r.URL.Query().Get("type")
headerLSN := r.URL.Query().Get(apidConfigIndexPar)
var timeout int
var err error
if blockSec != "" {
timeout, err = strconv.Atoi(blockSec)
if err != nil || timeout < 0 {
a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds")
return
}
}
log.Debugf("/configurations long-poll timeout: %d", timeout)
log.Debugf("Long-Poll-Index: %s", headerLSN)
// if filter by "type"
if typeFilter != "" {
a.sendReadyConfigurations(typeFilter, w, "")
return
}
// if no filter, check for long polling
cmpRes, apidLSN, err := a.compareLSN(headerLSN)
switch {
case err != nil:
if err == ErrInvalidLSN {
a.writeError(w, http.StatusBadRequest, http.StatusBadRequest, err.Error())
return
}
log.Errorf("Error in compareLSN: %v", err)
a.writeInternalError(w, err.Error())
return
case cmpRes <= 0: //APID_LSN <= Header_LSN
if timeout == 0 { // no long polling
w.WriteHeader(http.StatusNotModified)
} else { // long polling
util.LongPolling(w, time.Duration(timeout)*time.Second, a.addSubscriber, a.LongPollSuccessHandler, a.LongPollTimeoutHandler)
}
return
case cmpRes > 0: //APID_LSN > Header_LSN
a.sendReadyConfigurations("", w, apidLSN)
return
}
}
func (a *apiManager) LongPollSuccessHandler(c interface{}, w http.ResponseWriter) {
// send configs and LSN
confChange, ok := c.(*confChangeNotification)
if !ok || confChange.err != nil {
log.Errorf("Wrong confChangeNotification: %v, %v", ok, confChange)
a.writeInternalError(w, "Error getting configurations with long-polling")
return
}
a.sendDeployments(w, confChange.confs, confChange.LSN, "")
}
func (a *apiManager) LongPollTimeoutHandler(w http.ResponseWriter) {
log.Debug("long-polling configuration request timed out.")
w.WriteHeader(http.StatusNotModified)
}
func (a *apiManager) sendReadyConfigurations(typeFilter string, w http.ResponseWriter, apidLSN string) {
configurations, err := a.dbMan.getAllConfigurations(typeFilter)
if err != nil {
log.Errorf("Database error: %v", err)
a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error()))
return
}
a.sendDeployments(w, configurations, apidLSN, typeFilter)
}
func (a *apiManager) sendDeployments(w http.ResponseWriter, dataConfs []Configuration, apidLSN string, typeFilter string) {
apiConfs := ApiConfigurationResponse{}
apiConfDetails := make([]ApiConfigurationDetails, 0)
apiConfs.Kind = kindCollection
apiConfs.Self = getHttpHost() + a.configurationEndpoint
for _, d := range dataConfs {
apiConfDetails = append(apiConfDetails, ApiConfigurationDetails{
Self: apiConfs.Self + "/" + d.ID,
Name: d.Name,
Type: d.Type,
Revision: d.Revision,
BeanBlobUrl: getBlobUrl(d.BlobID),
Org: d.OrgID,
Env: d.EnvID,
ResourceBlobUrl: getBlobUrl(d.BlobResourceID),
Path: d.Path,
Created: convertTime(d.Created),
Updated: convertTime(d.Updated),
})
}
apiConfs.ApiConfigurationsResponse = apiConfDetails
if typeFilter != "" {
apiConfs.Self += "?type=" + typeFilter
}
b, err := json.Marshal(apiConfs)
if err != nil {
log.Errorf("unable to marshal deployments: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
if apidLSN != "" {
w.Header().Set(apidConfigIndexHeader, apidLSN)
}
w.Header().Set("Content-Type", headerJson)
log.Debugf("sending deployments %s", apidLSN)
w.Write(b)
}
func (a *apiManager) compareLSN(headerLSN string) (res int, apidLSN string, err error) {
apidLSN = a.dbMan.getLSN()
log.Debugf("apidLSN: %v", apidLSN)
// if no Long Poll Index
if headerLSN == "" {
return 1, apidLSN, nil
}
headerSeq, err := common.ParseSequence(headerLSN)
if err != nil {
log.Debugf("Error when Parse headerLSN Sequence: %v", err)
return 0, "", ErrInvalidLSN
}
apidSeq, err := common.ParseSequence(apidLSN)
if err != nil {
log.Errorf("Error when Parse apidLSN Sequence: %v", err)
return 0, "", err
}
return apidSeq.Compare(headerSeq), apidLSN, nil
}
// escape the blobId into url
func getBlobUrl(blobId string) string {
if blobId == "" {
return ""
}
return getHttpHost() + "/blobs/" + url.PathEscape(blobId)
}
func convertTime(t string) string {
if t == "" {
return ""
}
formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339, changeTimeFormat}
for _, f := range formats {
timestamp, err := time.Parse(f, t)
if err == nil {
return timestamp.Format(iso8601)
}
}
log.Error("convertTime: Unsupported time format: " + t)
return t
}
func getHttpHost() string {
configuredEndpoint := config.GetString(configBlobDownloadEndpoint)
if configuredEndpoint != "" {
return configuredEndpoint
}
// apid-core has to set this according to the protocol apid is to be run: http/https
proto := config.GetString(configProtocol)
if proto == "" {
proto = "http"
}
proto = proto + "://" + config.GetString(configAPIListen)
return proto
}