blob: 45ad641213e84e229af364bf410b76148b30619e [file] [log] [blame]
package apiGatewayDeploy
import (
"database/sql"
"fmt"
"github.com/30x/apid"
"sync"
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
)
type DataDeployment struct {
ID string
BundleConfigID string
ApidClusterID string
DataScopeID string
BundleConfigJSON string
ConfigJSON string
Status string
Created string
CreatedBy string
Updated string
UpdatedBy string
BundleName string
BundleURI string
BundleChecksum string
BundleChecksumType string
LocalBundleURI string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
func InitDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS etag (
value integer
);
INSERT INTO etag VALUES (1);
CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
apid_cluster_id varchar(36) NOT NULL,
data_scope_id varchar(36) NOT NULL,
bundle_config_json text NOT NULL,
config_json text NOT NULL,
status text NOT NULL,
created timestamp without time zone,
created_by text,
updated timestamp without time zone,
updated_by text,
bundle_name text,
bundle_uri text,
local_bundle_uri text,
deploy_status string,
deploy_error_code int,
deploy_error_message text,
PRIMARY KEY (id)
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
dbMux.RUnlock()
return db
}
func SetDB(db apid.DB) {
dbMux.Lock()
if unsafeDB == nil { // init API when DB is initialized
go InitAPI()
}
unsafeDB = db
dbMux.Unlock()
}
// call whenever the list of deployments changes
func incrementETag() error {
stmt, err := getDB().Prepare("UPDATE etag SET value = value+1;")
if err != nil {
log.Errorf("prepare update etag failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Errorf("update etag failed: %v", err)
return err
}
log.Debugf("etag incremented")
return err
}
func getETag() (string, error) {
var eTag string
db := getDB()
row := db.QueryRow("SELECT value FROM etag")
err := row.Scan(&eTag)
//err := getDB().QueryRow("SELECT value FROM etag").Scan(&eTag)
if err != nil {
log.Errorf("select etag failed: %v", err)
return "", err
}
log.Debugf("etag queried: %v", eTag)
return eTag, err
}
func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
log.Debugf("insertDeployment: %s", dep.ID)
stmt, err := tx.Prepare(`
INSERT INTO deployments
(id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, status, created,
created_by, updated, updated_by, bundle_name,
bundle_uri, local_bundle_uri)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14);
`)
if err != nil {
log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(
dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
dep.BundleConfigJSON, dep.ConfigJSON, dep.Status, dep.Created,
dep.CreatedBy, dep.Updated, dep.UpdatedBy, dep.BundleName,
dep.BundleURI, dep.LocalBundleURI)
if err != nil {
log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
return err
}
log.Debugf("insert into deployments %s succeeded", dep.ID)
return err
}
func deleteDeployment(tx *sql.Tx, depID string) error {
log.Debugf("deleteDeployment: %s", depID)
stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
if err != nil {
log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID)
if err != nil {
log.Errorf("delete from deployments %s failed: %v", depID, err)
return err
}
deploymentsChanged <- depID
log.Debugf("deleteDeployment %s succeeded", depID)
return err
}
// getReadyDeployments() returns array of deployments that are ready to deploy
func getReadyDeployments() (deployments []DataDeployment, err error) {
db := getDB()
rows, err := db.Query(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, status, created,
created_by, updated, updated_by, bundle_name,
bundle_uri, local_bundle_uri
FROM deployments
WHERE local_bundle_uri != ""
`)
if err != nil {
if err == sql.ErrNoRows {
return deployments, nil
}
log.Errorf("Error querying deployments: %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Status, &dep.Created,
&dep.CreatedBy, &dep.Updated, &dep.UpdatedBy, &dep.BundleName,
&dep.BundleURI, &dep.LocalBundleURI,
)
deployments = append(deployments, dep)
}
return
}
func setDeploymentResults(results apiDeploymentResults) error {
log.Debugf("setDeploymentResults: %v", results)
tx, err := getDB().Begin()
if err != nil {
log.Errorf("Unable to begin transaction: %v", err)
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
UPDATE deployments
SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
WHERE id=$4;
`)
if err != nil {
log.Errorf("prepare updateDeploymentStatus failed: %v", err)
return err
}
defer stmt.Close()
for _, result := range results {
res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
if err != nil {
log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
return err
}
n, err := res.RowsAffected()
if n == 0 || err != nil {
log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID))
}
}
err = tx.Commit()
if err != nil {
log.Errorf("Unable to commit setDeploymentResults transaction: %v", err)
}
return err
}
func updateLocalURI(depID, localBundleUri string) error {
tx, err := getDB().Begin()
if err != nil {
log.Errorf("begin updateLocalURI failed: %v", err)
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
log.Errorf("prepare updateLocalURI failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(localBundleUri, depID)
if err != nil {
log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
return err
}
err = tx.Commit()
if err != nil {
log.Errorf("commit updateLocalURI failed: %v", err)
return err
}
log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}