blob: 6eb721241674d2fcd8c00609b2ad979ef98e5bee [file] [log] [blame]
package apiGatewayDeploy
import (
"database/sql"
"encoding/json"
"github.com/30x/apid"
"io/ioutil"
"net/http"
"strconv"
"time"
)
// todo: add error codes where this is used
const ERROR_CODE_TODO = 0
type errorResponse struct {
ErrorCode int `json:"errorCode"`
Reason string `json:"reason"`
}
func initAPI(services apid.Services) {
services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET")
services.API().HandleFunc("/deployments/{deploymentID}", respHandler).Methods("POST")
}
func writeError(w http.ResponseWriter, status int, code int, reason string) {
e := errorResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
log.Debugf("sending (%d) error to client: %s", status, reason)
w.WriteHeader(status)
}
func writeDatabaseError(w http.ResponseWriter) {
writeError(w, http.StatusInternalServerError, ERROR_CODE_TODO, "database error")
}
// todo: The following was basically just copied from old APID - needs review.
func distributeEvents() {
subscribers := make(map[chan string]struct{})
for {
select {
case msg := <-incoming:
for subscriber := range subscribers {
select {
case subscriber <- msg:
log.Debugf("Handling deploy response for: %s", msg)
default:
log.Error("listener too far behind, message dropped")
}
}
case subscriber := <-addSubscriber:
log.Debugf("Add subscriber: %s", subscriber)
subscribers[subscriber] = struct{}{}
}
}
}
func handleCurrentDeployment(w http.ResponseWriter, r *http.Request) {
// If block greater than zero AND if request ETag header not empty AND if there is no new bundle list
// available, then block for up to the specified number of seconds until a new bundle list becomes
// available. If no new bundle list becomes available, then return an empty array.
b := r.URL.Query().Get("block")
var block int
if b != "" {
var err error
block, err = strconv.Atoi(b)
if err != nil {
writeError(w, http.StatusBadRequest, ERROR_CODE_TODO, "bad block value, must be number of seconds")
return
}
}
sent := sendDeployInfo(w, r, false)
// todo: can we kill the timer & channel if client connection is lost?
// blocking request (Long Poll)
if sent == false && block > 0 && r.Header.Get("etag") != "" {
log.Debug("Blocking request... Waiting for new Deployments.")
newReq := make(chan string)
// Update channel of a new request (subscriber)
addSubscriber <- newReq
// Block until timeout of new deployment
select {
case depID := <-newReq:
// todo: depID could be used directly instead of getting it again in sendDeployInfo
log.Debugf("DeploymentID = %s", depID)
sendDeployInfo(w, r, false)
case <-time.After(time.Duration(block) * time.Second):
log.Debug("Blocking deployment request timed out.")
sendDeployInfo(w, r, true)
}
}
}
func respHandler(w http.ResponseWriter, r *http.Request) {
depID := apid.API().Vars(r)["deploymentID"]
if depID == "" {
log.Error("No deployment ID")
// todo: add error code
writeError(w, http.StatusBadRequest, 0, "Missing deployment ID")
return
}
var rsp gwBundleResponse
buf, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(buf, &rsp)
if err != nil {
log.Error("Resp Handler Json Unmarshal err: ", err)
// todo: add error code
writeError(w, http.StatusBadRequest, 0, "Malformed body")
return
}
// todo: validate request body
/*
* If the state of deployment was success, update state of bundles and
* its deployments as success as well
*/
txn, err := db.Begin()
if err != nil {
log.Errorf("Unable to begin transaction: %s", err)
writeDatabaseError(w)
return
}
var updated bool
if rsp.Status == "SUCCESS" {
updated = updateDeploymentSuccess(depID, txn)
} else {
updated = updateDeploymentFailure(depID, rsp.GWbunRsp, txn)
}
log.Print("***** 1")
if !updated {
writeDatabaseError(w)
err = txn.Rollback()
if err != nil {
log.Errorf("Unable to rollback transaction: %s", err)
}
return
}
log.Print("***** 2")
err = txn.Commit()
if err != nil {
log.Errorf("Unable to commit transaction: %s", err)
writeDatabaseError(w)
}
log.Print("***** 3")
return
}
func updateDeploymentSuccess(depID string, txn *sql.Tx) bool {
log.Debugf("Marking deployment (%s) as SUCCEEDED", depID)
var rows int64
res, err := txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ? WHERE deployment_id = ?;",
DEPLOYMENT_STATE_SUCCESS, depID)
if err == nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_INFO Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_INFO Success: Dep Id (%s)", depID)
res, err = txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ? WHERE id = ?;",
DEPLOYMENT_STATE_SUCCESS, depID)
if err != nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
return true
}
func updateDeploymentFailure(depID string, rsp gwBundleErrorResponse, txn *sql.Tx) bool {
log.Infof("marking deployment (%s) as FAILED", depID)
var rows int64
/* Update the Deployment state errors */
res, err := txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ?, error_code = ? WHERE id = ?;",
DEPLOYMENT_STATE_ERR_GWY, rsp.ErrorCode, depID)
if err == nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_DEPLOYMENT Failed: Dep Id (%s): %v", depID, err)
return false
}
log.Infof("UPDATE BUNDLE_DEPLOYMENT Success: Dep Id (%s)", depID)
/* Iterate over Bundles, and update the errors */
for _, a := range rsp.ErrorDetails {
res, err = txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ?, errorcode = ?, error_reason = ? "+
"WHERE id = ?;", DEPLOYMENT_STATE_ERR_GWY, a.ErrorCode, a.Reason, a.BundleId)
if err != nil {
rows, err = res.RowsAffected()
}
if err != nil || rows == 0 {
log.Errorf("UPDATE BUNDLE_INFO Failed: Bund Id (%s): %v", a.BundleId, err)
return false
}
log.Infof("UPDATE BUNDLE_INFO Success: Bund Id (%s)", a.BundleId)
}
return true
}