blob: b317a8dc0f0b6054e9e377b40723ef8ab4e2fcc7 [file] [log] [blame] [edit]
package apiGatewayDeploy
import (
"database/sql"
"sync"
"github.com/30x/apid-core"
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
)
type DataDeployment struct {
ID string
OrgID string
EnvID string
DataScopeID string
Type int
Name string
Revision string
BlobID string
BlobResourceID string
Updated string
UpdatedBy string
Created string
CreatedBy string
BlobFSLocation string
BlobURL string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
func InitDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
blob_id character varying NOT NULL,
local_fs_location character varying NOT NULL,
access_url character varying
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
dbMux.RUnlock()
return db
}
// caller is responsible for calling dbMux.Lock() and dbMux.Unlock()
func SetDB(db apid.DB) {
if unsafeDB == nil { // init API when DB is initialized
go InitAPI()
}
unsafeDB = db
}
// getUnreadyDeployments() returns array of resources that are not yet to be processed
func getUnreadyDeployments() (deployments []DataDeployment, err error) {
err = nil
db := getDB()
rows, err := db.Query(`
SELECT id, org_id, env_id, name, revision, project_runtime_blob_metadata.blob_id, resource_blob_id
FROM project_runtime_blob_metadata
LEFT JOIN edgex_blob_available
ON project_runtime_blob_metadata.blob_id = edgex_blob_available.blob_id
WHERE edgex_blob_available.blob_id IS NULL;
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID)
deployments = append(deployments, dep)
log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No new resources found to be processed")
err = sql.ErrNoRows
}
return
}
// getDeployments()
func getReadyDeployments() (deployments []DataDeployment, err error) {
err = nil
db := getDB()
rows, err := db.Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
b.local_fs_location, b.access_url
FROM project_runtime_blob_metadata as a
INNER JOIN edgex_blob_available as b
ON a.blob_id = b.blob_id
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
&dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL)
deployments = append(deployments, dep)
log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No resources ready to be deployed")
err = sql.ErrNoRows
}
return
}
func updatelocal_fs_location(depID, local_fs_location string) error {
access_url := config.GetString("api_listen") + "/blob/" + depID
stmt, err := getDB().Prepare(`
INSERT INTO edgex_blob_available (blob_id, local_fs_location, access_url)
VALUES (?, ?, ?)`)
if err != nil {
log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID, local_fs_location, access_url)
if err != nil {
log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, local_fs_location, err)
return err
}
log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, local_fs_location)
return nil
}
func getLocalFSLocation (blobId string) (locfs string , err error) {
db := getDB()
rows, err := db.Query("SELECT local_fs_location FROM edgex_blob_available WHERE blob_id = " + blobId)
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
}
defer rows.Close()
rows.Scan(&locfs)
return
}