| package apiGatewayDeploy |
| |
| import ( |
| "database/sql" |
| "encoding/json" |
| "fmt" |
| "github.com/30x/apidGatewayDeploy/github" |
| "io" |
| "net/http" |
| "net/url" |
| "os" |
| "strconv" |
| "time" |
| ) |
| |
| // todo: The following was basically just copied from old APID - needs review. |
| |
| // todo: /current should return latest (regardless of status) if no ETag |
| |
| const DEPLOYMENT_STATE_UNUSED = 0 |
| const DEPLOYMENT_STATE_INPROG = 1 |
| const DEPLOYMENT_STATE_READY = 2 |
| const DEPLOYMENT_STATE_SUCCESS = 3 |
| const DEPLOYMENT_STATE_ERR_APID = 4 |
| const DEPLOYMENT_STATE_ERR_GWY = 5 |
| |
| var ( |
| bundlePathAbs string |
| gitHubAccessToken string |
| |
| incoming = make(chan string) |
| addSubscriber = make(chan chan string) |
| ) |
| |
| type systemBundle struct { |
| Uri string `json:"uri"` |
| } |
| |
| type dependantBundle struct { |
| Uri string `json:"uri"` |
| Org string `json:"org"` |
| Env string `json:"env"` |
| } |
| |
| type bundleManifest struct { |
| SysBun systemBundle `json:"system"` |
| DepBun []dependantBundle `json:"bundles"` |
| } |
| |
| type bundle struct { |
| BundleId string `json:"bundleId"` |
| URL string `json:"url"` |
| AuthCode string `json:"authCode,omitempty"` |
| } |
| |
| type deploymentResponse struct { |
| DeploymentId string `json:"deploymentId"` |
| Bundles []bundle `json:"bundles"` |
| System bundle `json:"system"` |
| } |
| |
| type gwBundleErrorDetail struct { |
| ErrorCode int `json:"errorCode"` |
| Reason string `json:"reason"` |
| BundleId string `json:"bundleId"` |
| } |
| |
| type gwBundleErrorResponse struct { |
| ErrorCode int `json:"errorCode"` |
| Reason string `json:"reason"` |
| ErrorDetails []gwBundleErrorDetail `json:"bundleErrors"` |
| } |
| |
| type gwBundleResponse struct { |
| Status string `json:"status"` |
| GWbunRsp gwBundleErrorResponse `json:"error"` |
| } |
| |
| // getBundleResourceData retrieves bundle data from a bundle repo and returns a ReadCloser. |
| func getBundleResourceData(uriString string) (io.ReadCloser, error) { |
| |
| uri, err := url.Parse(uriString) |
| if err != nil { |
| return nil, fmt.Errorf("DownloadFileUrl: Failed to parse urlStr: %s", uriString) |
| |
| } |
| |
| // todo: temporary - if not a github url, just open it or call GET on it |
| if uri.Host != "github.com" { |
| // assume it's a file if no scheme |
| if uri.Scheme == "" || uri.Scheme == "file" { |
| f, err := os.Open(uri.Path) |
| if err != nil { |
| return nil, err |
| } |
| return f, nil |
| } |
| |
| // some random uri, try to GET it |
| res, err := http.Get(uriString) |
| if err != nil { |
| return nil, err |
| } |
| return res.Body, nil |
| } |
| |
| // go get it from github using access token |
| return github.GetUrlData(uri, gitHubAccessToken) |
| } |
| |
| func createBundle(depPath string, uri string, depid string, org string, env string, typ string, txn *sql.Tx) int { |
| |
| status := DEPLOYMENT_STATE_INPROG |
| ts := int64(time.Now().UnixNano()) |
| timeString := strconv.FormatInt(ts, 10) |
| |
| var bundleID string |
| if typ == "sys" { |
| bundleID = typ + "_" + timeString |
| } else { |
| // todo: stop using org and env |
| bundleID = typ + "_" + org + "_" + env + "_" + timeString |
| } |
| locFile := depPath + "/" + bundleID + ".zip" |
| |
| var bundleData io.ReadCloser |
| out, err := os.Create(locFile) |
| if err != nil { |
| log.Error("Unable to create Bundle file ", locFile, " Err: ", err) |
| status = DEPLOYMENT_STATE_ERR_APID |
| goto FA |
| } |
| |
| bundleData, err = getBundleResourceData(uri) |
| if err != nil { |
| log.Error("Unable to read Bundle URI ", uri, " Err: ", err) |
| status = DEPLOYMENT_STATE_ERR_APID |
| goto FA |
| } |
| defer bundleData.Close() |
| io.Copy(out, bundleData) |
| out.Close() |
| |
| FA: |
| locFile = "file://" + locFile |
| success := createInitBundleDB(locFile, bundleID, ts, env, org, depid, |
| typ, locFile, status, txn) |
| |
| if !success { |
| return -1 |
| } else if status == DEPLOYMENT_STATE_ERR_APID { |
| return 1 |
| } else { |
| return 0 |
| } |
| } |
| |
| func orchestrateDeploymentAndTrigger() { |
| |
| depId := orchestrateDeployment() |
| if depId != "" { |
| incoming <- depId |
| } |
| } |
| |
| /* |
| * The series of actions to be taken here are :- |
| * (1) Find the latest Deployment id that is in Init state |
| * (2) Parse the Manifest URL |
| * (3) Download the system bundle and store locally, update DB |
| * (4) Download the dependent bundle and store locally, update DB |
| * (5) Update deployment state based on the status of deployment |
| // returns deploymentID |
| */ |
| func orchestrateDeployment() string { |
| |
| /* (1) Find the latest deployment, if none - get out */ |
| status := DEPLOYMENT_STATE_READY |
| txn, _ := db.Begin() |
| |
| var manifestString, deploymentID string |
| err := db.QueryRow("SELECT id, manifest FROM BUNDLE_DEPLOYMENT WHERE deploy_status = ? "+ |
| "ORDER BY created_at ASC LIMIT 1;", DEPLOYMENT_STATE_UNUSED). |
| Scan(&deploymentID, &manifestString) |
| |
| switch { |
| case err == sql.ErrNoRows: |
| log.Error("No Deployments available to be processed") |
| return "" |
| case err != nil: |
| log.Error("SELECT on BUNDLE_DEPLOYMENT failed with Err: ", err) |
| return "" |
| } |
| |
| /* (2) Parse Manifest */ |
| var bf bundleManifest |
| var fileInfo os.FileInfo |
| var deploymentPath string |
| var res int |
| var result bool |
| err = json.Unmarshal([]byte(manifestString), &bf) |
| if err != nil { |
| log.Error("JSON decoding Manifest failed Err: ", err) |
| status = DEPLOYMENT_STATE_ERR_APID |
| goto EB |
| } |
| |
| // todo: validate bundle! |
| //for bun := range bf.DepBun { |
| // if bun.uri |
| //} |
| |
| fileInfo, err = os.Stat(bundlePathAbs) |
| if err != nil || !fileInfo.IsDir() { |
| log.Error("Path ", bundlePathAbs, " is not a directory") |
| status = DEPLOYMENT_STATE_ERR_APID |
| goto EB |
| } |
| |
| deploymentPath = bundlePathAbs + "/" + deploymentID |
| err = os.Mkdir(deploymentPath, 0700) |
| if err != nil { |
| log.Errorf("Deployment Dir creation error: %v", err) |
| status = DEPLOYMENT_STATE_ERR_APID |
| goto EB |
| } |
| |
| /* (3) Download system bundle and store locally */ |
| res = createBundle(deploymentPath, bf.SysBun.Uri, deploymentID, "", "", "sys", txn) |
| if res == -1 { |
| log.Error("Abort Txn: Unable to update DB with system bundle info") |
| goto EC |
| } else if res == 1 { |
| status = DEPLOYMENT_STATE_ERR_APID |
| } |
| |
| /* (4) Loop through the Dependent bundles and store them locally as well */ |
| for _, ele := range bf.DepBun { |
| res = createBundle(deploymentPath, ele.Uri, deploymentID, ele.Org, ele.Env, "dep", txn) |
| if res == -1 { |
| log.Error("Abort Txn: Unable to update DB with dependent bundle info") |
| goto EC |
| } else if res == 1 { |
| status = DEPLOYMENT_STATE_ERR_APID |
| } |
| } |
| EB: |
| if status == DEPLOYMENT_STATE_ERR_APID && deploymentPath != "" { |
| os.RemoveAll(deploymentPath) |
| } |
| /* (5) Update Deployment state accordingly */ |
| result = updateDeployStatusDB(deploymentID, status, txn) |
| if result == false { |
| log.Error("Abort Txn: Unable to update DB with Deployment status") |
| goto EC |
| } |
| txn.Commit() |
| return deploymentID |
| EC: |
| os.RemoveAll(deploymentPath) |
| txn.Rollback() |
| return "" |
| } |
| |
| /* |
| * Create Init Bundle (FIXME : Put this in a struct and pass - too many i/p args) |
| */ |
| func createInitBundleDB(fileurl string, id string, cts int64, env string, org string, depid string, typ string, loc string, status int, txn *sql.Tx) bool { |
| |
| _, err := txn.Exec("INSERT INTO BUNDLE_INFO (id, deployment_id, org, env, url, type, deploy_status, "+ |
| "created_at, file_url)VALUES(?,?,?,?,?,?,?,?,?);", id, depid, org, env, loc, typ, status, cts, fileurl) |
| |
| if err != nil { |
| log.Error("INSERT BUNDLE_INFO Failed (id, dep id) : (", id, ", ", depid, ")", err) |
| return false |
| } else { |
| log.Info("INSERT BUNDLE_INFO Success: (", id, ")") |
| return true |
| } |
| |
| } |
| |
| func updateDeployStatusDB(id string, status int, txn *sql.Tx) bool { |
| |
| _, err := txn.Exec("UPDATE BUNDLE_INFO SET deploy_status = ? WHERE deployment_id = ?;", status, id) |
| if err != nil { |
| log.Error("UPDATE BUNDLE_INFO Failed: (", id, ") : ", err) |
| return false |
| } else { |
| log.Info("UPDATE BUNDLE_INFO Success: (", id, ")") |
| } |
| |
| _, err = txn.Exec("UPDATE BUNDLE_DEPLOYMENT SET deploy_status = ? WHERE id = ?;", status, id) |
| if err != nil { |
| log.Error("UPDATE BUNDLE_DEPLOYMENT Failed: (", id, ") : ", err) |
| return false |
| } else { |
| log.Info("UPDATE BUNDLE_DEPLOYMENT Success: (", id, ")") |
| } |
| return true |
| |
| } |
| |
| // getCurrentDeploymentID returns the ID of what should be the "current" deployment |
| func getCurrentDeploymentID() (string, error) { |
| var depID string |
| err := db.QueryRow("SELECT id FROM BUNDLE_DEPLOYMENT ORDER BY created_at ASC LIMIT 1;").Scan(&depID) |
| return depID, err |
| } |
| |
| |
| // getDeployment returns a fully populated deploymentResponse |
| func getDeployment(depID string) (*deploymentResponse, error) { |
| |
| rows, err := db.Query("SELECT file_url, id, type FROM BUNDLE_INFO WHERE deployment_id = ?;", depID) |
| if err != nil { |
| log.Errorf("Unable to query BUNDLE_INFO. Err: %s", err) |
| return nil, err |
| } |
| |
| depRes := deploymentResponse{ |
| Bundles: []bundle{}, |
| DeploymentId: depID, |
| } |
| |
| for rows.Next() { |
| var bundleID, fileUrl, bundleType string |
| err = rows.Scan(&fileUrl, &bundleID, &bundleType) |
| if err != nil { |
| log.Errorf("BUNDLE_INFO fetch failed. Err: %s", err) |
| return nil, err |
| } |
| if bundleType == "sys" { |
| depRes.System = bundle{ |
| BundleId: bundleID, |
| URL: fileUrl, |
| } |
| } else { |
| bd := bundle{ |
| AuthCode: bundleID, // todo: authCode? |
| BundleId: bundleID, |
| URL: fileUrl, |
| } |
| depRes.Bundles = append(depRes.Bundles, bd) |
| } |
| } |
| return &depRes, nil |
| } |