blob: 7f34e74e72fe7caa49be895d1020f1fd49691346 [file] [log] [blame]
package apiGatewayDeploy
import (
"database/sql"
"encoding/json"
"github.com/30x/apid"
"io/ioutil"
"net/http"
"strconv"
"time"
)
// todo: add error codes where this is used
const ERROR_CODE_TODO = 0
type errorResponse struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
}
func initAPI(services apid.Services) {
services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET")
services.API().HandleFunc("/deployments/{deploymentID}", respHandler).Methods("POST")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
e := errorResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
log.Debugf("sending (%d) error to client: %s", status, reason)
w.WriteHeader(status)
}
func writeDatabaseError(w http.ResponseWriter) {
writeError(w, http.StatusInternalServerError, ERROR_CODE_TODO, "database error")
}
// todo: The following was basically just copied from old APID - needs review.
func distributeEvents() {
subscribers := make(map[chan string]struct{})
for {
select {
case msg := <-incoming:
for subscriber := range subscribers {
select {
case subscriber <- msg:
log.Debugf("Handling deploy response for: %s", msg)
default:
log.Error("listener too far behind, message dropped")
}
}
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %s", subscriber)
subscribers[subscriber] = struct{}{}
}
}
}
func handleCurrentDeployment(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
// If returning If-None-Match value is equal to current deployment, status = 304
// If returning a new value, status = 200
// If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then
// block for up to the specified number of seconds until a new deployment becomes available.
b := r.URL.Query().Get("block")
var timeout int
if b != "" {
var err error
timeout, err = strconv.Atoi(b)
if err != nil {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "bad block value, must be number of seconds")
return
}
}
// If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
// query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
// most recent bundle list.
priorDepID := r.Header.Get("If-None-Match")
depID, err := getCurrentDeploymentID()
if err != nil && err != sql.ErrNoRows{
writeDatabaseError(w)
return
}
// not found, no timeout, send immediately
if depID == "" && timeout == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
// found, send immediately - if doesn't match prior ID
if depID != "" {
if depID == priorDepID {
if timeout == 0 {
w.WriteHeader(http.StatusNotModified)
return
} else {
// continue
}
} else {
sendDeployment(w, depID)
return
}
}
// can't send immediately, we need to block...
// todo: can we kill the timer & channel if client connection is lost?
// todo: resolve race condition - may miss a notification
log.Debug("Blocking request... Waiting for new Deployments.")
newReq := make(chan string)
// subscribe to new deployments
addSubscriber <- newReq
// block until new deployment or timeout
select {
case depID := <-newReq:
sendDeployment(w, depID)
case <-time.After(time.Duration(timeout) * time.Second):
log.Debug("Blocking deployment request timed out.")
w.WriteHeader(http.StatusNotFound)
return
}
}
func sendDeployment(w http.ResponseWriter, depID string) {
deployment, err := getDeployment(depID)
if err != nil {
log.Errorf("unable to retrieve deployment [%s]: %s", depID, err)
w.WriteHeader(http.StatusInternalServerError)
}
b, err := json.Marshal(deployment)
if err != nil {
log.Errorf("unable to marshal deployment: %s", err)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.Header().Set("ETag", depID)
w.Write(b)
}
}
func respHandler(w http.ResponseWriter, r *http.Request) {
depID := apid.API().Vars(r)["deploymentID"]
if depID == "" {
log.Error("No deployment ID")
// todo: add error code
writeError(w, http.StatusBadRequest, 0, "Missing deployment ID")
return
}
var rsp gwBundleResponse
buf, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(buf, &rsp)
if err != nil {
log.Error("Resp Handler Json Unmarshal err: ", err)
// todo: add error code
writeError(w, http.StatusBadRequest, 0, "Malformed body")
return
}
// todo: validate request body
/*
* If the state of deployment was success, update state of bundles and
* its deployments as success as well
*/
txn, err := db.Begin()
if err != nil {
log.Errorf("Unable to begin transaction: %s", err)
writeDatabaseError(w)
return
}
var updated bool
if rsp.Status == "SUCCESS" {
updated = updateDeploymentSuccess(depID, txn)
} else {
updated = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
}
if !updated {
writeDatabaseError(w)
err = txn.Rollback()
if err != nil {
log.Errorf("Unable to rollback transaction: %s", err)
}
return
}
err = txn.Commit()
if err != nil {
log.Errorf("Unable to commit transaction: %s", err)
writeDatabaseError(w)
}
return
}
func updateDeploymentSuccess(depID string, txn *sql.Tx) bool {
log.Debugf("Marking deployment (%s) as SUCCEEDED", depID)
var rows int64
res, err := txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ? WHERE deployment_id = ?;",
DEPLOYMENT_STATE_SUCCESS, depID)
if err == nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_INFO Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_INFO Success: Dep Id (%s)", depID)
res, err = txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ? WHERE id = ?;",
DEPLOYMENT_STATE_SUCCESS, depID)
if err != nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
return true
}
func updateDeploymentFailure(depID string, rsp gwBundleErrorResponse, txn *sql.Tx) bool {
log.Infof("marking deployment (%s) as FAILED", depID)
var rows int64
/* Update the Deployment state errors */
res, err := txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ?, error_code = ? WHERE id = ?;",
DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode, depID)
if err == nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
/* Iterate over Bundles, and update the errors */
for _, a := range rsp.ErrorDetails {
res, err = txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ?, errorcode = ?, error_reason = ? "+
"WHERE id = ?;", DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason, a.BundleId)
if err != nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_INFO Failed: Bund Id (%s): %v", a.BundleId, err)
return false
}
log.Infof("UPDATE BUNDLE_INFO Success: Bund Id (%s)", a.BundleId)
}
return true
}