blob: 0d8cd7e3ee751936915584e7e31a9885ef2debb4 [file] [log] [blame] [edit]
package apiGatewayDeploy
import (
"database/sql"
"fmt"
"github.com/30x/apid-core"
"sync"
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
)
type DataDeployment struct {
ID string
BundleConfigID string
ApidClusterID string
DataScopeID string
BundleConfigJSON string
ConfigJSON string
Created string
CreatedBy string
Updated string
UpdatedBy string
BundleName string
BundleURI string
LocalBundleURI string
BundleChecksum string
BundleChecksumType string
DeployStatus string
DeployErrorCode int
DeployErrorMessage string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
func InitDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
apid_cluster_id varchar(36) NOT NULL,
data_scope_id varchar(36) NOT NULL,
bundle_config_json text NOT NULL,
config_json text NOT NULL,
created timestamp without time zone,
created_by text,
updated timestamp without time zone,
updated_by text,
bundle_name text,
bundle_uri text,
local_bundle_uri text,
bundle_checksum text,
bundle_checksum_type text,
deploy_status string,
deploy_error_code int,
deploy_error_message text,
PRIMARY KEY (id)
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
dbMux.RUnlock()
return db
}
// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
func SetDB(db apid.DB) {
if unsafeDB == nil { // init API when DB is initialized
go InitAPI()
}
unsafeDB = db
}
func InsertDeployment(tx *sql.Tx, dep DataDeployment) error {
log.Debugf("insertDeployment: %s", dep.ID)
stmt, err := tx.Prepare(`
INSERT INTO deployments
(id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
log.Errorf("prepare insert into deployments %s failed: %v", dep.ID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(
dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
dep.Updated, dep.UpdatedBy, dep.BundleName, dep.BundleURI,
dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
dep.DeployErrorCode, dep.DeployErrorMessage)
if err != nil {
log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
return err
}
log.Debugf("insert into deployments %s succeeded", dep.ID)
return err
}
func deleteDeployment(tx *sql.Tx, depID string) error {
log.Debugf("deleteDeployment: %s", depID)
stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
if err != nil {
log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID)
if err != nil {
log.Errorf("delete from deployments %s failed: %v", depID, err)
return err
}
deploymentsChanged <- depID
log.Debugf("deleteDeployment %s succeeded", depID)
return err
}
// getReadyDeployments() returns array of deployments that are ready to deploy
func getReadyDeployments() (deployments []DataDeployment, err error) {
return getDeployments("WHERE local_bundle_uri != $1", "")
}
// getUnreadyDeployments() returns array of deployments that are not yet ready to deploy
func getUnreadyDeployments() (deployments []DataDeployment, err error) {
return getDeployments("WHERE local_bundle_uri = $1 and deploy_status = $2", "", "")
}
// getDeployments() accepts a "WHERE ..." clause and optional parameters and returns the list of deployments
func getDeployments(where string, a ...interface{}) (deployments []DataDeployment, err error) {
db := getDB()
var stmt *sql.Stmt
stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
updated, updated_by, bundle_name, bundle_uri,
local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message
FROM deployments
` + where)
if err != nil {
return
}
var rows *sql.Rows
rows, err = stmt.Query(a...)
if err != nil {
if err == sql.ErrNoRows {
return
}
log.Errorf("Error querying deployments: %v", err)
return
}
defer rows.Close()
deployments = dataDeploymentsFromRows(rows)
return
}
func dataDeploymentsFromRows(rows *sql.Rows) (deployments []DataDeployment) {
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.BundleConfigID, &dep.ApidClusterID, &dep.DataScopeID,
&dep.BundleConfigJSON, &dep.ConfigJSON, &dep.Created, &dep.CreatedBy,
&dep.Updated, &dep.UpdatedBy, &dep.BundleName, &dep.BundleURI,
&dep.LocalBundleURI, &dep.BundleChecksum, &dep.BundleChecksumType, &dep.DeployStatus,
&dep.DeployErrorCode, &dep.DeployErrorMessage,
)
deployments = append(deployments, dep)
}
return
}
func setDeploymentResults(results apiDeploymentResults) error {
log.Debugf("setDeploymentResults: %v", results)
tx, err := getDB().Begin()
if err != nil {
log.Errorf("Unable to begin transaction: %v", err)
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
UPDATE deployments
SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
WHERE id=$4;
`)
if err != nil {
log.Errorf("prepare updateDeploymentStatus failed: %v", err)
return err
}
defer stmt.Close()
for _, result := range results {
res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
if err != nil {
log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
return err
}
n, err := res.RowsAffected()
if n == 0 || err != nil {
log.Error(fmt.Sprintf("no deployment matching '%s' to update. skipping.", result.ID))
}
}
err = tx.Commit()
if err != nil {
log.Errorf("Unable to commit setDeploymentResults transaction: %v", err)
}
return err
}
func updateLocalBundleURI(depID, localBundleUri string) error {
stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
log.Errorf("prepare updateLocalBundleURI failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(localBundleUri, depID)
if err != nil {
log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
return err
}
log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}
func getLocalBundleURI(tx *sql.Tx, depID string) (localBundleUri string, err error) {
err = tx.QueryRow("SELECT local_bundle_uri FROM deployments WHERE id=$1;", depID).Scan(&localBundleUri)
if err == sql.ErrNoRows {
err = nil
}
return
}