blob: 267ed157204c665a8cd5a233f208ef2bfe61bbbf [file] [log] [blame]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
import (
"database/sql"
"sync"
"github.com/30x/apid-core"
)
var (
gwBlobId int64
)
type DataDeployment struct {
ID string
OrgID string
EnvID string
Type string
Name string
Revision string
BlobID string
GWBlobID string
BlobResourceID string
Updated string
UpdatedBy string
Created string
CreatedBy string
BlobFSLocation string
BlobURL string
}
type SQLExec interface {
Exec(query string, args ...interface{}) (sql.Result, error)
}
type dbManagerInterface interface {
setDbVersion(string)
initDb() error
getUnreadyDeployments() ([]DataDeployment, error)
getReadyDeployments() ([]DataDeployment, error)
updateLocalFsLocation(string, string, string) error
getLocalFSLocation(string) (string, error)
}
type dbManager struct {
data apid.DataService
db apid.DB
dbMux sync.RWMutex
}
func (dbc *dbManager) setDbVersion(version string) {
db, err := dbc.data.DBVersion(version)
if err != nil {
log.Panicf("Unable to access database: %v", err)
}
dbc.dbMux.Lock()
dbc.db = db
dbc.dbMux.Unlock()
}
func (dbc *dbManager) getDb() apid.DB {
dbc.dbMux.RLock()
defer dbc.dbMux.RUnlock()
return dbc.db
}
func (dbc *dbManager) initDb() error {
_, err := dbc.getDb().Exec(`
CREATE TABLE IF NOT EXISTS edgex_blob_available (
gwblobid integer primary key,
runtime_meta_id character varying NOT NULL,
local_fs_location character varying NOT NULL,
access_url character varying
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
// getUnreadyDeployments() returns array of resources that are not yet to be processed
func (dbc *dbManager) getUnreadyDeployments() (deployments []DataDeployment, err error) {
rows, err := dbc.getDb().Query(`
SELECT project_runtime_blob_metadata.id, org_id, env_id, name, revision, blob_id, resource_blob_id
FROM project_runtime_blob_metadata
LEFT JOIN edgex_blob_available
ON project_runtime_blob_metadata.id = edgex_blob_available.runtime_meta_id
WHERE edgex_blob_available.runtime_meta_id IS NULL;
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID)
deployments = append(deployments, dep)
log.Debugf("New configurations to be processed Id {%s}, blobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No new resources found to be processed")
err = sql.ErrNoRows
}
return
}
// getDeployments()
func (dbc *dbManager) getReadyDeployments() (deployments []DataDeployment, err error) {
rows, err := dbc.getDb().Query(`
SELECT a.id, a.org_id, a.env_id, a.name, a.type, a.revision, a.blob_id,
a.resource_blob_id, a.created_at, a.created_by, a.updated_at, a.updated_by,
b.local_fs_location, b.access_url, b.gwblobid
FROM project_runtime_blob_metadata as a
INNER JOIN edgex_blob_available as b
ON a.id = b.runtime_meta_id
`)
if err != nil {
log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
return
}
defer rows.Close()
for rows.Next() {
dep := DataDeployment{}
rows.Scan(&dep.ID, &dep.OrgID, &dep.EnvID, &dep.Name, &dep.Type, &dep.Revision, &dep.BlobID,
&dep.BlobResourceID, &dep.Created, &dep.CreatedBy, &dep.Updated,
&dep.UpdatedBy, &dep.BlobFSLocation, &dep.BlobURL, &dep.GWBlobID)
deployments = append(deployments, dep)
log.Debugf("New Configurations available Id {%s} BlobId {%s}", dep.ID, dep.BlobID)
}
if len(deployments) == 0 {
log.Debug("No resources ready to be deployed")
err = sql.ErrNoRows
}
return
}
func (dbc *dbManager) updateLocalFsLocation(depID, bundleId, localFsLocation string) error {
access_url := getHttpHost() + blobEndpointPath + "/" + bundleId
stmt, err := dbc.getDb().Prepare(`
INSERT INTO edgex_blob_available (runtime_meta_id, gwblobid, local_fs_location, access_url)
VALUES (?, ?, ?, ?)`)
if err != nil {
log.Errorf("PREPARE updatelocal_fs_location failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID, bundleId, localFsLocation, access_url)
if err != nil {
log.Errorf("UPDATE edgex_blob_available id {%s} local_fs_location {%s} failed: %v", depID, localFsLocation, err)
return err
}
log.Debugf("INSERT edgex_blob_available {%s} local_fs_location {%s} succeeded", depID, localFsLocation)
return nil
}
func (dbc *dbManager) getLocalFSLocation(blobId string) (locfs string, err error) {
log.Debugf("Getting the blob file for blobId {%s}", blobId)
rows, err := dbc.getDb().Query("SELECT local_fs_location FROM edgex_blob_available WHERE gwblobid = \"" + blobId + "\"")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
}
defer rows.Close()
for rows.Next() {
rows.Scan(&locfs)
log.Debugf("Got the blob file {%s} for blobId {%s}", locfs, blobId)
}
return
}