blob: a867e7a6b93cf861a013ce81d4d0a61d614d5edb [file] [log] [blame]
package apiGatewayConfDeploy
import (
"database/sql"
"sync"
"github.com/30x/apid-core"
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
gwBlobId int64
)
type DataDeployment struct {
ID string
OrgID string
EnvID string
Type string
Name string
Revision string
BlobID string
GWBlobID string
BlobResourceID string
Updated string
UpdatedBy string
Created string
CreatedBy string
BlobFSLocation string
BlobURL string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
func InitDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
gwblobid integer primary key,
runtime_meta_id character varying NOT NULL,
local_fs_location character varying NOT NULL,
access_url character varying
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
dbMux.RUnlock()
return db
}
// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
func SetDB(db apid.DB) {
if unsafeDB == nil { // init API when DB is initialized
go InitAPI()
}
unsafeDB = db
}
// getUnreadyDeployments() returns array of resources that are not yet to be processed
func getUnreadyDeployments() (deployments []DataDeployment, err error) {
err = nil
db := getDB()
rows, err := db.Query(`
SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
FROM project_runtime_blob_metadata
LEFT JOIN edgex_blob_available
ON project_runtime_blob_metadata.id = edgex_blob_available.runtime_meta_id
WHERE edgex_blob_available.runtime_meta_id IS NULL;
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID)
deployments = append(deployments, dep)
log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No new resources found to be processed")
err = sql.ErrNoRows
}
return
}
// getDeployments()
func getReadyDeployments() (deployments []DataDeployment, err error) {
err = nil
db := getDB()
rows, err := db.Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
b.local_fs_location, b.access_url, b.gwblobid
FROM project_runtime_blob_metadata as a
INNER JOIN edgex_blob_available as b
ON a.id = b.runtime_meta_id
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
&dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL, &dep.GWBlobID)
deployments = append(deployments, dep)
log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No resources ready to be deployed")
err = sql.ErrNoRows
}
return
}
func updatelocal_fs_location(depID, bundleId, local_fs_location string) error {
access_url := get_http_host() + "/blob/" + bundleId
stmt, err := getDB().Prepare(`
INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
VALUES (?, ?, ?, ?)`)
if err != nil {
log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID, bundleId, local_fs_location, access_url)
if err != nil {
log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err)
return err
}
log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location)
return nil
}
func getLocalFSLocation(blobId string) (locfs string, err error) {
db := getDB()
log.Debugf("Getting the blob file for blobId {%s}", blobId)
rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
}
defer rows.Close()
for rows.Next() {
rows.Scan(&locfs)
log.Debugf("Got the blob file {%s} for blobId {%s}", locfs, blobId)
}
return
}