blob: a9b1496cb1fb8ace955e4e5138b748635ae8d8b6 [file] [log] [blame] [edit]
package apidApigeeSync
import (
"crypto/rand"
"database/sql"
"errors"
"fmt"
"sync"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
"sort"
"strings"
)
var (
unsafeDB apid.DB
dbMux sync.RWMutex
)
type dataApidCluster struct {
ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string
Updated, Created string
}
type dataDataScope struct {
ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string
Updated, Created string
}
/*
This plugin uses 2 databases:
1. The default DB is used for APID table.
2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE
(Currently, the snapshot never changes, but this is future-proof)
*/
func initDB(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS APID (
instance_id text,
apid_cluster_id text,
last_snapshot_info text,
PRIMARY KEY (instance_id)
);
`)
if err != nil {
return err
}
log.Debug("Database tables created.")
return nil
}
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
dbMux.RUnlock()
return db
}
func setDB(db apid.DB) {
dbMux.Lock()
unsafeDB = db
dbMux.Unlock()
}
//TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn
func insert(tableName string, rows []common.Row, txn *sql.Tx) bool {
if len(rows) == 0 {
return false
}
var orderedColumns []string
for column := range rows[0] {
orderedColumns = append(orderedColumns, column)
}
sort.Strings(orderedColumns)
sql := buildInsertSql(tableName, orderedColumns, rows)
prep, err := txn.Prepare(sql)
if err != nil {
log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err)
return false
}
defer prep.Close()
var values []interface{}
for _, row := range rows {
for _, columnName := range orderedColumns {
//use Value so that stmt exec does not complain about common.ColumnVal being a struct
values = append(values, row[columnName].Value)
}
}
//create prepared statement from existing template statement
_, err = prep.Exec(values...)
if err != nil {
log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err)
return false
}
log.Debugf("INSERT Success [%s] values=%v", sql, values)
return true
}
func getValueListFromKeys(row common.Row, pkeys []string) []interface{} {
var values = make([]interface{}, len(pkeys))
for i, pkey := range pkeys {
if row[pkey] == nil {
values[i] = nil
} else {
values[i] = row[pkey].Value
}
}
return values
}
func _delete(tableName string, rows []common.Row, txn *sql.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
sort.Strings(pkeys)
if len(pkeys) == 0 || err != nil {
log.Errorf("DELETE No primary keys found for table. %s", tableName)
return false
}
if len(rows) == 0 {
log.Errorf("No rows found for table.", tableName)
return false
}
sql := buildDeleteSql(tableName, rows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err)
return false
}
defer prep.Close()
for _, row := range rows {
values := getValueListFromKeys(row, pkeys)
// delete prepared statement from existing template statement
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err)
return false
}
affected, err := res.RowsAffected()
if err == nil && affected != 0 {
log.Debugf("DELETE Success [%s] values=%v", sql, values)
} else if err == nil && affected == 0 {
log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values)
return false
} else {
log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err)
return false
}
}
return true
}
// Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;"
func buildDeleteSql(tableName string, row common.Row, pkeys []string) string {
var wherePlaceholders []string
i := 1
if row == nil {
return ""
}
normalizedTableName := normalizeTableName(tableName)
for _, pk := range pkeys {
wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
i++
}
sql := "DELETE FROM " + normalizedTableName
sql += " WHERE "
sql += strings.Join(wherePlaceholders, " AND ")
return sql
}
func update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool {
pkeys, err := getPkeysForTable(tableName)
if len(pkeys) == 0 || err != nil {
log.Errorf("UPDATE No primary keys found for table.", tableName)
return false
}
if len(oldRows) == 0 || len(newRows) == 0 {
return false
}
var orderedColumns []string
//extract sorted orderedColumns
for columnName := range newRows[0] {
orderedColumns = append(orderedColumns, columnName)
}
sort.Strings(orderedColumns)
//build update statement, use arbitrary row as template
sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys)
prep, err := txn.Prepare(sql)
if err != nil {
log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err)
return false
}
defer prep.Close()
for i, row := range newRows {
var values []interface{}
for _, columnName := range orderedColumns {
//use Value so that stmt exec does not complain about common.ColumnVal being a struct
//TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping
//TODO right now this will only work when the column type is a string
if row[columnName] != nil {
values = append(values, row[columnName].Value)
} else {
values = append(values, nil)
}
}
//add values for where clause, use PKs of old row
for _, pk := range pkeys {
if oldRows[i][pk] != nil {
values = append(values, oldRows[i][pk].Value)
} else {
values = append(values, nil)
}
}
//create prepared statement from existing template statement
res, err := txn.Stmt(prep).Exec(values...)
if err != nil {
log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
return false
}
numRowsAffected, err := res.RowsAffected()
if err != nil {
log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err)
return false
}
//delete this once we figure out why tests are failing/not updating
log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected)
log.Debugf("UPDATE Success [%s] values=%v", sql, values)
}
return true
}
func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string {
if row == nil {
return ""
}
normalizedTableName := normalizeTableName(tableName)
var setPlaceholders, wherePlaceholders []string
i := 1
for _, columnName := range orderedColumns {
setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i))
i++
}
for _, pk := range pkeys {
wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i))
i++
}
sql := "UPDATE " + normalizedTableName + " SET "
sql += strings.Join(setPlaceholders, ", ")
sql += " WHERE "
sql += strings.Join(wherePlaceholders, " AND ")
return sql
}
//precondition: rows.length > 1000, max number of entities for sqlite
func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string {
if len(rows) == 0 {
return ""
}
normalizedTableName := normalizeTableName(tableName)
var values string = ""
var i, j int
k := 1
for i = 0; i < len(rows)-1; i++ {
values += "("
for j = 0; j < len(orderedColumns)-1; j++ {
values += fmt.Sprintf("$%d,", k)
k++
}
values += fmt.Sprintf("$%d),", k)
k++
}
values += "("
for j = 0; j < len(orderedColumns)-1; j++ {
values += fmt.Sprintf("$%d,", k)
k++
}
values += fmt.Sprintf("$%d)", k)
sql := "INSERT INTO " + normalizedTableName
sql += "(" + strings.Join(orderedColumns, ",") + ") "
sql += "VALUES " + values
return sql
}
func getPkeysForTable(tableName string) ([]string, error) {
db := getDB()
normalizedTableName := normalizeTableName(tableName)
sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;"
rows, err := db.Query(sql, normalizedTableName)
if err != nil {
log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err)
return nil, err
}
var columnNames []string
defer rows.Close()
for rows.Next() {
var value string
err := rows.Scan(&value)
if err != nil {
log.Fatal(err)
}
columnNames = append(columnNames, value)
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
return columnNames, nil
}
func normalizeTableName(tableName string) string {
return strings.Replace(tableName, ".", "_", 1)
}
/*
* For the given apidConfigId, this function will retrieve all the distinch scopes
* associated with it. Distinct, because scope is already a collection of the tenants.
*/
func findScopesForId(configId string) (scopes []string) {
log.Debugf("findScopesForId: %s", configId)
var scope string
db := getDB()
rows, err := db.Query("select DISTINCT scope from EDGEX_DATA_SCOPE where apid_cluster_id = $1", configId)
if err != nil {
log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err)
return
}
defer rows.Close()
for rows.Next() {
rows.Scan(&scope)
scopes = append(scopes, scope)
}
log.Debugf("scopes: %v", scopes)
return
}
/*
* Retrieve SnapshotInfo for the given apidConfigId from apid_config table
*/
func getLastSequence() (lastSequence string) {
err := getDB().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence)
if err != nil && err != sql.ErrNoRows {
log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err)
return
}
log.Debugf("lastSequence: %s", lastSequence)
return
}
/*
* Persist the last change Id each time a change has been successfully
* processed by the plugin(s)
*/
func updateLastSequence(lastSequence string) error {
log.Debugf("updateLastSequence: %s", lastSequence)
stmt, err := getDB().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;")
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(lastSequence)
if err != nil {
log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err)
return err
}
log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence)
log.Infof("Replication lastSequence=%s", lastSequence)
return nil
}
func getApidInstanceInfo() (info apidInstanceInfo, err error) {
info.InstanceName = config.GetString(configName)
info.ClusterID = config.GetString(configApidClusterId)
var savedClusterId string
// always use default database for this
var db apid.DB
db, err = dataService.DB()
if err != nil {
return
}
err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1").
Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot)
if err != nil {
if err != sql.ErrNoRows {
log.Errorf("Unable to retrieve apidInstanceInfo: %v", err)
return
} else {
// first start - no row, generate a UUID and store it
err = nil
newInstanceID = true
info.InstanceID = generateUUID()
log.Debugf("Inserting new apid instance id %s", info.InstanceID)
db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
}
} else if savedClusterId != info.ClusterID {
log.Debug("Detected apid cluster id change in config. Apid will start clean")
err = nil
newInstanceID = true
info.InstanceID = generateUUID()
db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)",
info.InstanceID, info.ClusterID, "")
info.LastSnapshot = ""
}
return
}
func updateApidInstanceInfo() error {
// always use default database for this
db, err := dataService.DB()
if err != nil {
return err
}
rows, err := db.Exec(`
REPLACE
INTO APID (instance_id, apid_cluster_id, last_snapshot_info)
VALUES (?,?,?)`,
apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot)
if err != nil {
return err
}
n, err := rows.RowsAffected()
if err == nil && n == 0 {
err = errors.New("no rows affected")
}
return err
}
/*
* generates a random uuid (mix of timestamp & crypto random string)
*/
//TODO: Change to https://tools.ietf.org/html/rfc4122 based implementation such as https://github.com/google/uuid
func generateUUID() string {
buff := make([]byte, 16)
numRead, err := rand.Read(buff)
if numRead != len(buff) || err != nil {
panic(err)
}
/* uuid v4 spec */
buff[6] = (buff[6] | 0x40) & 0x4F
buff[8] = (buff[8] | 0x80) & 0xBF
return fmt.Sprintf("%x-%x-%x-%x-%x", buff[0:4], buff[4:6], buff[6:8], buff[8:10], buff[10:])
}