blob: be8dd34639cc853f6100ec61cf4ba3149283eed0 [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidApigeeSync
import (
"database/sql"
"errors"
"fmt"
"github.com/apid/apid-core/util"
"sync"
"github.com/apid/apid-core"
"github.com/apigee-labs/transicator/common"
"sort"
"strings"
)
var (
dbMux sync.RWMutex
)
/*
This plugin uses 2 databases:
1. The default DB is used for APID table.
2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE
(Currently, the snapshot never changes, but this is future-proof)
*/
func creatDbManager() *dbManager {
return &dbManager{
DbMux: &sync.RWMutex{},
knownTables: make(map[string]bool),
}
}
type dbManager struct {
Db apid.DB
DbMux *sync.RWMutex
dbVersion string
knownTables map[string]bool
}
// idempotent call to initialize default DB
func (dbMan *dbManager) initDB() error {
db, err := dataService.DB()
if err != nil {
return err
}
tx, err := db.Begin()
if err != nil {
log.Errorf("initDB(): Unable to get DB tx err: {%v}", err)
return err
}
defer tx.Rollback()
_, err = tx.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
apid_cluster_id text,
last_snapshot_info text,
PRIMARY KEY (instance_id)
);
`)
if err != nil {
log.Errorf("initDB(): Unable to tx exec err: {%v}", err)
return err
}
if err = tx.Commit(); err != nil {
log.Errorf("Error when initDb: %v", err)
return err
}
log.Debug("Database tables created.")
return nil
}
func (dbMan *dbManager) getDB() apid.DB {
dbMux.RLock()
defer dbMux.RUnlock()
return dbMan.Db
}
func (dbMan *dbManager) setDB(db apid.DB) {
dbMux.Lock()
defer dbMux.Unlock()
dbMan.Db = db
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
func (dbMan *dbManager) insert(tableName string, rows []common.Row, txn apid.Tx) error {
if len(rows) == 0 {
return fmt.Errorf("no rows")
}
var orderedColumns []string
for column := range rows[0] {
orderedColumns = append(orderedColumns, column)
}
sort.Strings(orderedColumns)
sql := dbMan.buildInsertSql(tableName, orderedColumns, rows)
prep, err := txn.Prepare(sql)
if err != nil {
log.Errorf("INSERT Fail to prepare statement %s error=%v", sql, err)
return err
}
defer prep.Close()
var values []interface{}
for _, row := range rows {
for _, columnName := range orderedColumns {
//use Value so that stmt exec does not complain about common.ColumnVal being a struct
values = append(values, row[columnName].Value)
}
}
//create prepared statement from existing template statement
_, err = prep.Exec(values...)
if err != nil {
log.Errorf("INSERT Fail %s values=%v error=%v", sql, values, err)
return err
}
log.Debugf("INSERT Success %s values=%v", sql, values)
return nil
}
func (dbMan *dbManager) getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
var values = make([]interface{}, len(pkeys))
for i, pkey := range pkeys {
if row[pkey] == nil {
values[i] = nil
} else {
values[i] = row[pkey].Value
}
}
return values
}
func (dbMan *dbManager) delete(tableName string, rows []common.Row, txn apid.Tx) error {
pkeys, err := dbMan.getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
return fmt.Errorf("DELETE No primary keys found for table. %s", tableName)
}
if len(rows) == 0 {
return fmt.Errorf("no rows found for table %s", tableName)
}
sql := dbMan.buildDeleteSql(tableName, rows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
return fmt.Errorf("DELETE Fail to prep statement %s error=%v", sql, err)
}
defer prep.Close()
for _, row := range rows {
values := dbMan.getValueListFromKeys(row, pkeys)
// delete prepared statement from existing template statement
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
return fmt.Errorf("DELETE Fail %s values=%v error=%v", sql, values, err)
}
affected, err := res.RowsAffected()
if err == nil && affected != 0 {
log.Debugf("DELETE Success %s values=%v", sql, values)
} else if err == nil && affected == 0 {
return fmt.Errorf("entry not found %s values=%v, nothing to delete", sql, values)
} else {
return fmt.Errorf("DELETE Failed %s values=%v error=%v", sql, values, err)
}
}
return nil
}
// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
func (dbMan *dbManager) buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
var wherePlaceholders []string
i := 1
if row == nil {
return ""
}
normalizedTableName := normalizeTableName(tableName)
for _, pk := range pkeys {
wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
i++
}
sql := "DELETE FROM " + normalizedTableName
sql += " WHERE "
sql += strings.Join(wherePlaceholders, " AND ")
return sql
}
func (dbMan *dbManager) update(tableName string, oldRows, newRows []common.Row, txn apid.Tx) error {
pkeys, err := dbMan.getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
return fmt.Errorf("UPDATE No primary keys found for table: %v, %v", tableName, err)
}
if len(oldRows) == 0 || len(newRows) == 0 {
return fmt.Errorf("UPDATE No old or new rows, table: %v, %v, %v", tableName, oldRows, newRows)
}
var orderedColumns []string
//extract sorted orderedColumns
for columnName := range newRows[0] {
orderedColumns = append(orderedColumns, columnName)
}
sort.Strings(orderedColumns)
//build update statement, use arbitrary row as template
sql := dbMan.buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
return fmt.Errorf("UPDATE Fail to prep statement %s error=%v", sql, err)
}
defer prep.Close()
for i, row := range newRows {
var values []interface{}
for _, columnName := range orderedColumns {
//use Value so that stmt exec does not complain about common.ColumnVal being a struct
//TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping
//TODO right now this will only work when the column type is a string
if row[columnName] != nil {
values = append(values, row[columnName].Value)
} else {
values = append(values, nil)
}
}
//add values for where clause, use PKs of old row
for _, pk := range pkeys {
if oldRows[i][pk] != nil {
values = append(values, oldRows[i][pk].Value)
} else {
values = append(values, nil)
}
}
//create prepared statement from existing template statement
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
}
numRowsAffected, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("UPDATE Fail %s values=%v error=%v", sql, values, err)
}
//delete this once we figure out why tests are failing/not updating
log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
log.Debugf("UPDATE Success %s values=%v", sql, values)
}
return nil
}
func (dbMan *dbManager) buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
if row == nil {
return ""
}
normalizedTableName := normalizeTableName(tableName)
var setPlaceholders, wherePlaceholders []string
i := 1
for _, columnName := range orderedColumns {
setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i))
i++
}
for _, pk := range pkeys {
wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
i++
}
sql := "UPDATE " + normalizedTableName + " SET "
sql += strings.Join(setPlaceholders, ", ")
sql += " WHERE "
sql += strings.Join(wherePlaceholders, " AND ")
return sql
}
//precondition: rows.length > 1000, max number of entities for sqlite
func (dbMan *dbManager) buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
if len(rows) == 0 {
return ""
}
normalizedTableName := normalizeTableName(tableName)
var values string = ""
var i, j int
k := 1
for i = 0; i < len(rows)-1; i++ {
values += "("
for j = 0; j < len(orderedColumns)-1; j++ {
values += fmt.Sprintf("$%d,", k)
k++
}
values += fmt.Sprintf("$%d),", k)
k++
}
values += "("
for j = 0; j < len(orderedColumns)-1; j++ {
values += fmt.Sprintf("$%d,", k)
k++
}
values += fmt.Sprintf("$%d)", k)
sql := "INSERT INTO " + normalizedTableName
sql += "(" + strings.Join(orderedColumns, ",") + ") "
sql += "VALUES " + values
return sql
}
func (dbMan *dbManager) getPkeysForTable(tableName string) ([]string, error) {
db := dbMan.getDB()
normalizedTableName := normalizeTableName(tableName)
sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
rows, err := db.Query(sql, normalizedTableName)
if err != nil {
log.Errorf("Failed %s values=%s Error: %v", sql, normalizedTableName, err)
return nil, err
}
var columnNames []string
defer rows.Close()
for rows.Next() {
var value string
err := rows.Scan(&value)
if err != nil {
log.Errorf("failed to scan column names: %v", err)
return nil, err
}
columnNames = append(columnNames, value)
}
err = rows.Err()
if err != nil {
log.Errorf("failed to scan column names: %v", err)
return nil, err
}
return columnNames, nil
}
func normalizeTableName(tableName string) string {
return strings.Replace(tableName, ".", "_", 1)
}
/*
* For the given apidConfigId, this function will retrieve all the distinch scopes
* associated with it. Distinct, because scope is already a collection of the tenants.
*/
func (dbMan *dbManager) findScopesForId(configId string) (scopes []string, err error) {
log.Debugf("findScopesForId: %s", configId)
var scope sql.NullString
db := dbMan.getDB()
query := `
SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1
UNION
SELECT org_scope FROM edgex_data_scope WHERE apid_cluster_id = $2
UNION
SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3
`
rows, err := db.Query(query, configId, configId, configId)
if err != nil {
log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
return
}
defer rows.Close()
for rows.Next() {
err = rows.Scan(&scope)
if err != nil {
log.Errorf("Failed to get scopes from EDGEX_DATA_SCOPE: %v", err)
return
}
if scope.Valid && scope.String != "" {
scopes = append(scopes, scope.String)
}
}
log.Debugf("scopes: %v", scopes)
return
}
/*
* Retrieve SnapshotInfo for the given apidConfigId from apid_config table
*/
func (dbMan *dbManager) getLastSequence() (lastSequence string) {
err := dbMan.getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
if err != nil && err != sql.ErrNoRows {
log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
return
}
log.Debugf("lastSequence: %s", lastSequence)
return
}
/*
* Persist the last change Id each time a change has been successfully
* processed by the plugin(s)
*/
func (dbMan *dbManager) updateLastSequence(lastSequence string) error {
log.Debugf("updateLastSequence: %s", lastSequence)
tx, err := dbMan.getDB().Begin()
if err != nil {
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
defer tx.Rollback()
_, err = tx.Exec("UPDATE EDGEX_APID_CLUSTER SET last_sequence=?;", lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
if err = tx.Commit(); err != nil {
log.Errorf("Commit error in updateLastSequence: %v", err)
}
return err
}
func (dbMan *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) {
info.InstanceName = config.GetString(configName)
info.ClusterID = config.GetString(configApidClusterId)
var savedClusterId string
// always use default database for this
var db apid.DB
db, err = dataService.DB()
if err != nil {
return
}
tx, err := db.Begin()
if err != nil {
log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return
}
defer tx.Rollback()
err = tx.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
return
} else {
// first start - no row, generate a UUID and store it
err = nil
info.IsNewInstance = true
info.InstanceID = util.GenerateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
_, err = tx.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
log.Warnf("Detected apid cluster id change in config. %v v.s. %v Apid will start clean.",
savedClusterId, info.ClusterID)
err = nil
info.IsNewInstance = true
info.InstanceID = util.GenerateUUID()
_, err = tx.Exec("DELETE FROM APID;")
info.LastSnapshot = ""
}
if err = tx.Commit(); err != nil {
log.Errorf("Commit error in getApidInstanceInfo: %v", err)
}
return
}
func (dbMan *dbManager) updateApidInstanceInfo(instanceId, clusterId, lastSnap string) error {
log.Debugf("updateApidInstanceInfo: %v, %v, %v", instanceId, clusterId, lastSnap)
// always use default database for this
db, err := dataService.DB()
if err != nil {
return err
}
tx, err := db.Begin()
if err != nil {
log.Errorf("updateApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
return err
}
defer tx.Rollback()
rows, err := tx.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
instanceId, clusterId, lastSnap)
if err != nil {
log.Errorf("updateApidInstanceInfo: Tx Exec Err: {%v}", err)
return err
}
n, err := rows.RowsAffected()
if err == nil && n == 0 {
err = errors.New("no rows affected")
}
if err = tx.Commit(); err != nil {
log.Errorf("Commit error in updateApidInstanceInfo: %v", err)
}
return err
}
func (dbMan *dbManager) extractTables() (map[string]bool, error) {
tables := make(map[string]bool)
db := dbMan.getDB()
rows, err := db.Query("SELECT DISTINCT tableName FROM _transicator_tables;")
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var table sql.NullString
if err := rows.Scan(&table); err != nil {
return nil, err
}
log.Debugf("Table %v found in existing db", table)
if table.Valid {
tables[table.String] = true
}
}
log.Debugf("Extracting table names from existing DB %v", tables)
return tables, nil
}
func (dbMan *dbManager) getKnowTables() map[string]bool {
return dbMan.knownTables
}
func (dbMan *dbManager) processChangeList(changes *common.ChangeList) error {
tx, err := dbMan.getDB().Begin()
if err != nil {
return err
}
defer tx.Rollback()
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
for _, change := range changes.Changes {
if change.Table == LISTENER_TABLE_APID_CLUSTER {
return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
}
switch change.Operation {
case common.Insert:
err = dbMan.insert(change.Table, []common.Row{change.NewRow}, tx)
case common.Update:
if change.Table == LISTENER_TABLE_DATA_SCOPE {
return fmt.Errorf("illegal operation: %s for %s", change.Operation, change.Table)
}
err = dbMan.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
case common.Delete:
err = dbMan.delete(change.Table, []common.Row{change.OldRow}, tx)
}
if err != nil {
return err
}
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("Commit error in processChangeList: %v", err)
}
return nil
}
func (dbMan *dbManager) processSnapshot(snapshot *common.Snapshot, isDataSnapshot bool) error {
var prevDb string
if apidInfo.LastSnapshot != "" && apidInfo.LastSnapshot != snapshot.SnapshotInfo {
log.Debugf("Release snapshot for {%s}. Switching to version {%s}",
apidInfo.LastSnapshot, snapshot.SnapshotInfo)
prevDb = apidInfo.LastSnapshot
} else {
log.Debugf("Process snapshot for version {%s}",
snapshot.SnapshotInfo)
}
db, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
return fmt.Errorf("unable to access database: %v", err)
}
var numApidClusters int
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("unable to open DB txn: {%v}", err.Error())
}
defer tx.Rollback()
err = tx.QueryRow("SELECT COUNT(*) FROM edgex_apid_cluster").Scan(&numApidClusters)
if err != nil {
return fmt.Errorf("unable to read database: {%s}", err.Error())
}
if numApidClusters != 1 {
return fmt.Errorf("illegal state for apid_cluster, must be a single row")
}
_, err = tx.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
if err != nil && err.Error() != "duplicate column name: last_sequence" {
return fmt.Errorf("Unable to create last_sequence column on DB. Error {%v}", err.Error())
}
if err = tx.Commit(); err != nil {
return fmt.Errorf("error when commit in processSqliteSnapshot: %v", err)
}
//update apid instance info
apidInfo.LastSnapshot = snapshot.SnapshotInfo
err = dbMan.updateApidInstanceInfo(apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
log.Errorf("Unable to update instance info: %v", err)
return fmt.Errorf("unable to update instance info: %v", err)
}
dbMan.setDB(db)
if isDataSnapshot {
dbMan.knownTables, err = dbMan.extractTables()
if err != nil {
return fmt.Errorf("unable to extract tables: %v", err)
}
}
log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
// Releases the DB, when the Connection reference count reaches 0.
if prevDb != "" {
dataService.ReleaseDB(prevDb)
}
return nil
}