| // Copyright 2017 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package apidApigeeSync |
| |
| import ( |
| "crypto/rand" |
| "database/sql" |
| "errors" |
| "fmt" |
| "sync" |
| |
| "github.com/30x/apid-core" |
| "github.com/apigee-labs/transicator/common" |
| "sort" |
| "strings" |
| ) |
| |
| var ( |
| dbMux sync.RWMutex |
| ) |
| |
| type dataApidCluster struct { |
| ID, Name, OrgAppName, CreatedBy, UpdatedBy, Description string |
| Updated, Created string |
| } |
| |
| type dataDataScope struct { |
| ID, ClusterID, Scope, Org, Env, CreatedBy, UpdatedBy string |
| Updated, Created string |
| } |
| |
| /* |
| This plugin uses 2 databases: |
| 1. The default DB is used for APID table. |
| 2. The versioned DB is used for APID_CLUSTER & DATA_SCOPE |
| (Currently, the snapshot never changes, but this is future-proof) |
| */ |
| func (dbc *dbManager) initDefaultDb() error { |
| db, err := dbc.getDefaultDb() |
| if err != nil { |
| return err |
| } |
| _, err = db.Exec(` |
| CREATE TABLE IF NOT EXISTS APID ( |
| instance_id text, |
| apid_cluster_id text, |
| last_snapshot_info text, |
| PRIMARY KEY (instance_id) |
| ); |
| `) |
| if err != nil { |
| return err |
| } |
| |
| log.Debug("Database tables created.") |
| return nil |
| } |
| |
| type dbManager struct { |
| db apid.DB |
| dbMux sync.RWMutex |
| data apid.DataService |
| } |
| |
| func (dbc *dbManager) setDbVersion(version string) { |
| db, err := dbc.data.DBVersion(version) |
| if err != nil { |
| log.Panicf("Unable to access database: %v", err) |
| } |
| dbc.dbMux.Lock() |
| dbc.db = db |
| dbc.dbMux.Unlock() |
| } |
| |
| func (dbc *dbManager) getDb() apid.DB { |
| dbc.dbMux.RLock() |
| defer dbc.dbMux.RUnlock() |
| return dbc.db |
| } |
| |
| func (dbc *dbManager) getDefaultDb() (apid.DB, error) { |
| db, err := dataService.DB() |
| return db, err |
| } |
| |
| func (dbc *dbManager) setDb(db apid.DB) { |
| dbMux.Lock() |
| defer dbMux.Unlock() |
| dbc.db = db |
| } |
| |
| //TODO if len(rows) > 1000, chunk it up and exec multiple inserts in the txn |
| func (dbc *dbManager) insert(tableName string, rows []common.Row, txn *sql.Tx) bool { |
| |
| if len(rows) == 0 { |
| return false |
| } |
| |
| var orderedColumns []string |
| for column := range rows[0] { |
| orderedColumns = append(orderedColumns, column) |
| } |
| sort.Strings(orderedColumns) |
| |
| sql := buildInsertSql(tableName, orderedColumns, rows) |
| |
| prep, err := txn.Prepare(sql) |
| if err != nil { |
| log.Errorf("INSERT Fail to prepare statement [%s] error=[%v]", sql, err) |
| return false |
| } |
| defer prep.Close() |
| |
| var values []interface{} |
| |
| for _, row := range rows { |
| for _, columnName := range orderedColumns { |
| //use Value so that stmt exec does not complain about common.ColumnVal being a struct |
| values = append(values, row[columnName].Value) |
| } |
| } |
| |
| //create prepared statement from existing template statement |
| _, err = prep.Exec(values...) |
| |
| if err != nil { |
| log.Errorf("INSERT Fail [%s] values=%v error=[%v]", sql, values, err) |
| return false |
| } |
| log.Debugf("INSERT Success [%s] values=%v", sql, values) |
| |
| return true |
| } |
| |
| func getValueListFromKeys(row common.Row, pkeys []string) []interface{} { |
| var values = make([]interface{}, len(pkeys)) |
| for i, pkey := range pkeys { |
| if row[pkey] == nil { |
| values[i] = nil |
| } else { |
| values[i] = row[pkey].Value |
| } |
| } |
| return values |
| } |
| |
| func (dbc *dbManager) deleteRowsFromTable(tableName string, rows []common.Row, txn *sql.Tx) bool { |
| pkeys, err := dbc.getPkeysForTable(tableName) |
| sort.Strings(pkeys) |
| if len(pkeys) == 0 || err != nil { |
| log.Errorf("DELETE No primary keys found for table. %s", tableName) |
| return false |
| } |
| |
| if len(rows) == 0 { |
| log.Errorf("No rows found for table.", tableName) |
| return false |
| } |
| |
| sql := buildDeleteSql(tableName, rows[0], pkeys) |
| prep, err := txn.Prepare(sql) |
| if err != nil { |
| log.Errorf("DELETE Fail to prep statement [%s] error=[%v]", sql, err) |
| return false |
| } |
| defer prep.Close() |
| for _, row := range rows { |
| values := getValueListFromKeys(row, pkeys) |
| // delete prepared statement from existing template statement |
| res, err := txn.Stmt(prep).Exec(values...) |
| if err != nil { |
| log.Errorf("DELETE Fail [%s] values=%v error=[%v]", sql, values, err) |
| return false |
| } |
| affected, err := res.RowsAffected() |
| if err == nil && affected != 0 { |
| log.Debugf("DELETE Success [%s] values=%v", sql, values) |
| } else if err == nil && affected == 0 { |
| log.Errorf("Entry not found [%s] values=%v. Nothing to delete.", sql, values) |
| return false |
| } else { |
| log.Errorf("DELETE Failed [%s] values=%v error=[%v]", sql, values, err) |
| return false |
| } |
| |
| } |
| return true |
| |
| } |
| |
| // Syntax "DELETE FROM Obj WHERE key1=$1 AND key2=$2 ... ;" |
| func buildDeleteSql(tableName string, row common.Row, pkeys []string) string { |
| |
| var wherePlaceholders []string |
| i := 1 |
| if row == nil { |
| return "" |
| } |
| normalizedTableName := normalizeTableName(tableName) |
| |
| for _, pk := range pkeys { |
| wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i)) |
| i++ |
| } |
| |
| sql := "DELETE FROM " + normalizedTableName |
| sql += " WHERE " |
| sql += strings.Join(wherePlaceholders, " AND ") |
| |
| return sql |
| |
| } |
| |
| func (dbc *dbManager) update(tableName string, oldRows, newRows []common.Row, txn *sql.Tx) bool { |
| pkeys, err := dbc.getPkeysForTable(tableName) |
| if len(pkeys) == 0 || err != nil { |
| log.Errorf("UPDATE No primary keys found for table.", tableName) |
| return false |
| } |
| if len(oldRows) == 0 || len(newRows) == 0 { |
| return false |
| } |
| |
| var orderedColumns []string |
| |
| //extract sorted orderedColumns |
| for columnName := range newRows[0] { |
| orderedColumns = append(orderedColumns, columnName) |
| } |
| sort.Strings(orderedColumns) |
| |
| //build update statement, use arbitrary row as template |
| sql := buildUpdateSql(tableName, orderedColumns, newRows[0], pkeys) |
| prep, err := txn.Prepare(sql) |
| if err != nil { |
| log.Errorf("UPDATE Fail to prep statement [%s] error=[%v]", sql, err) |
| return false |
| } |
| defer prep.Close() |
| |
| for i, row := range newRows { |
| var values []interface{} |
| |
| for _, columnName := range orderedColumns { |
| //use Value so that stmt exec does not complain about common.ColumnVal being a struct |
| //TODO will need to convert the Value (which is a string) to the appropriate field, using type for mapping |
| //TODO right now this will only work when the column type is a string |
| if row[columnName] != nil { |
| values = append(values, row[columnName].Value) |
| } else { |
| values = append(values, nil) |
| } |
| } |
| |
| //add values for where clause, use PKs of old row |
| for _, pk := range pkeys { |
| if oldRows[i][pk] != nil { |
| values = append(values, oldRows[i][pk].Value) |
| } else { |
| values = append(values, nil) |
| } |
| |
| } |
| |
| //create prepared statement from existing template statement |
| res, err := txn.Stmt(prep).Exec(values...) |
| |
| if err != nil { |
| log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err) |
| return false |
| } |
| numRowsAffected, err := res.RowsAffected() |
| if err != nil { |
| log.Errorf("UPDATE Fail [%s] values=%v error=[%v]", sql, values, err) |
| return false |
| } |
| //delete this once we figure out why tests are failing/not updating |
| log.Debugf("NUM ROWS AFFECTED BY UPDATE: %d", numRowsAffected) |
| log.Debugf("UPDATE Success [%s] values=%v", sql, values) |
| |
| } |
| |
| return true |
| |
| } |
| |
| func buildUpdateSql(tableName string, orderedColumns []string, row common.Row, pkeys []string) string { |
| if row == nil { |
| return "" |
| } |
| normalizedTableName := normalizeTableName(tableName) |
| |
| var setPlaceholders, wherePlaceholders []string |
| i := 1 |
| |
| for _, columnName := range orderedColumns { |
| setPlaceholders = append(setPlaceholders, fmt.Sprintf("%s=$%v", columnName, i)) |
| i++ |
| } |
| |
| for _, pk := range pkeys { |
| wherePlaceholders = append(wherePlaceholders, fmt.Sprintf("%s=$%v", pk, i)) |
| i++ |
| } |
| |
| sql := "UPDATE " + normalizedTableName + " SET " |
| sql += strings.Join(setPlaceholders, ", ") |
| sql += " WHERE " |
| sql += strings.Join(wherePlaceholders, " AND ") |
| |
| return sql |
| } |
| |
| //precondition: rows.length > 1000, max number of entities for sqlite |
| func buildInsertSql(tableName string, orderedColumns []string, rows []common.Row) string { |
| if len(rows) == 0 { |
| return "" |
| } |
| normalizedTableName := normalizeTableName(tableName) |
| var values string = "" |
| |
| var i, j int |
| k := 1 |
| for i = 0; i < len(rows)-1; i++ { |
| values += "(" |
| for j = 0; j < len(orderedColumns)-1; j++ { |
| values += fmt.Sprintf("$%d,", k) |
| k++ |
| } |
| values += fmt.Sprintf("$%d),", k) |
| k++ |
| } |
| values += "(" |
| for j = 0; j < len(orderedColumns)-1; j++ { |
| values += fmt.Sprintf("$%d,", k) |
| k++ |
| } |
| values += fmt.Sprintf("$%d)", k) |
| |
| sql := "INSERT INTO " + normalizedTableName |
| sql += "(" + strings.Join(orderedColumns, ",") + ") " |
| sql += "VALUES " + values |
| |
| return sql |
| } |
| |
| func (dbc *dbManager) getPkeysForTable(tableName string) ([]string, error) { |
| db := dbc.getDb() |
| normalizedTableName := normalizeTableName(tableName) |
| sql := "SELECT columnName FROM _transicator_tables WHERE tableName=$1 AND primaryKey ORDER BY columnName;" |
| rows, err := db.Query(sql, normalizedTableName) |
| if err != nil { |
| log.Errorf("Failed [%s] values=[s%] Error: %v", sql, normalizedTableName, err) |
| return nil, err |
| } |
| var columnNames []string |
| defer rows.Close() |
| for rows.Next() { |
| var value string |
| err := rows.Scan(&value) |
| if err != nil { |
| log.Fatal(err) |
| } |
| columnNames = append(columnNames, value) |
| } |
| err = rows.Err() |
| if err != nil { |
| log.Fatal(err) |
| } |
| return columnNames, nil |
| } |
| |
| func normalizeTableName(tableName string) string { |
| return strings.Replace(tableName, ".", "_", 1) |
| } |
| |
| /* |
| * For the given apidConfigId, this function will retrieve all the distinch scopes |
| * associated with it. Distinct, because scope is already a collection of the tenants. |
| */ |
| func (dbc *dbManager) findScopesForId(configId string) (scopes []string) { |
| |
| log.Debugf("findScopesForId: %s", configId) |
| |
| var scope sql.NullString |
| db := dbc.getDb() |
| |
| query := ` |
| SELECT scope FROM edgex_data_scope WHERE apid_cluster_id = $1 |
| UNION |
| SELECT org_scope FROM edgex_data_scope WHERE apid_cluster_id = $2 |
| UNION |
| SELECT env_scope FROM edgex_data_scope WHERE apid_cluster_id = $3 |
| ` |
| |
| rows, err := db.Query(query, configId, configId, configId) |
| if err != nil { |
| log.Errorf("Failed to query EDGEX_DATA_SCOPE: %v", err) |
| return |
| } |
| defer rows.Close() |
| for rows.Next() { |
| rows.Scan(&scope) |
| if scope.Valid { |
| scopes = append(scopes, scope.String) |
| } |
| } |
| |
| log.Debugf("scopes: %v", scopes) |
| return |
| } |
| |
| /* |
| * Retrieve SnapshotInfo for the given apidConfigId from apid_config table |
| */ |
| func (dbc *dbManager) getLastSequence() (lastSequence string) { |
| |
| err := dbc.getDb().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) |
| if err != nil && err != sql.ErrNoRows { |
| log.Panicf("Failed to query EDGEX_APID_CLUSTER: %v", err) |
| return |
| } |
| |
| log.Debugf("lastSequence: %s", lastSequence) |
| return |
| } |
| |
| /* |
| * Persist the last change Id each time a change has been successfully |
| * processed by the plugin(s) |
| */ |
| func (dbc *dbManager) updateLastSequence(lastSequence string) error { |
| |
| log.Debugf("updateLastSequence: %s", lastSequence) |
| |
| stmt, err := dbc.getDb().Prepare("UPDATE EDGEX_APID_CLUSTER SET last_sequence=$1;") |
| if err != nil { |
| log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) |
| return err |
| } |
| defer stmt.Close() |
| |
| _, err = stmt.Exec(lastSequence) |
| if err != nil { |
| log.Errorf("UPDATE EDGEX_APID_CLUSTER Failed: %v", err) |
| return err |
| } |
| |
| log.Debugf("UPDATE EDGEX_APID_CLUSTER Success: %s", lastSequence) |
| log.Infof("Replication lastSequence=%s", lastSequence) |
| return nil |
| } |
| |
| func (dbc *dbManager) getApidInstanceInfo() (info apidInstanceInfo, err error) { |
| info.InstanceName = config.GetString(configName) |
| info.ClusterID = config.GetString(configApidClusterId) |
| |
| var savedClusterId string |
| |
| // always use default database for this |
| var db apid.DB |
| db, err = dataService.DB() |
| if err != nil { |
| return |
| } |
| |
| err = db.QueryRow("SELECT instance_id, apid_cluster_id, last_snapshot_info FROM APID LIMIT 1"). |
| Scan(&info.InstanceID, &savedClusterId, &info.LastSnapshot) |
| if err != nil { |
| if err != sql.ErrNoRows { |
| log.Errorf("Unable to retrieve apidInstanceInfo: %v", err) |
| return |
| } else { |
| // first start - no row, generate a UUID and store it |
| err = nil |
| info.InstanceID = GenerateUUID() |
| |
| log.Debugf("Inserting new apid instance id %s", info.InstanceID) |
| db.Exec("INSERT INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", |
| info.InstanceID, info.ClusterID, "") |
| } |
| } else if savedClusterId != info.ClusterID { |
| log.Debug("Detected apid cluster id change in config. Apid will start clean") |
| err = nil |
| info.InstanceID = GenerateUUID() |
| |
| db.Exec("REPLACE INTO APID (instance_id, apid_cluster_id, last_snapshot_info) VALUES (?,?,?)", |
| info.InstanceID, info.ClusterID, "") |
| info.LastSnapshot = "" |
| } |
| return |
| } |
| |
| func (dbc *dbManager) updateApidInstanceInfo() error { |
| |
| // always use default database for this |
| db, err := dbc.getDefaultDb() |
| if err != nil { |
| return err |
| } |
| |
| rows, err := db.Exec(` |
| REPLACE |
| INTO APID (instance_id, apid_cluster_id, last_snapshot_info) |
| VALUES (?,?,?)`, |
| apidInfo.InstanceID, apidInfo.ClusterID, apidInfo.LastSnapshot) |
| if err != nil { |
| return err |
| } |
| n, err := rows.RowsAffected() |
| if err == nil && n == 0 { |
| err = errors.New("no rows affected") |
| } |
| |
| return err |
| } |
| |
| func (dbc *dbManager) getClusterCount() (numApidClusters int, err error) { |
| rows, err := dbc.db.Query("SELECT COUNT(*) FROM edgex_apid_cluster") |
| if err != nil { |
| return |
| } |
| defer rows.Close() |
| rows.Next() |
| err = rows.Scan(&numApidClusters) |
| return |
| } |
| |
| func (dbc *dbManager) alterClusterTable() (err error) { |
| _, err = dbc.db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''") |
| if err.Error() == "duplicate column name: last_sequence" { |
| return nil |
| } |
| return |
| } |
| |
| func (dbc *dbManager) writeTransaction(changes *common.ChangeList) bool { |
| tx, err := dbc.getDb().Begin() |
| if err != nil { |
| log.Panicf("Error processing ChangeList: %v", err) |
| } |
| defer tx.Rollback() |
| var ok bool |
| for _, change := range changes.Changes { |
| switch change.Operation { |
| case common.Insert: |
| ok = dbc.insert(change.Table, []common.Row{change.NewRow}, tx) |
| case common.Update: |
| ok = dbc.update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx) |
| case common.Delete: |
| ok = dbc.deleteRowsFromTable(change.Table, []common.Row{change.OldRow}, tx) |
| } |
| if !ok { |
| log.Error("Sql Operation error. Operation rollbacked") |
| return ok |
| } |
| } |
| err = tx.Commit() |
| if err != nil { |
| log.Errorf("Error processing ChangeList: %v", err) |
| return false |
| } |
| return ok |
| } |
| |
| /* |
| * generates a random uuid (mix of timestamp & crypto random string) |
| */ |
| |
| //TODO: Change to https://tools.ietf.org/html/rfc4122 based implementation such as https://github.com/google/uuid |
| func GenerateUUID() string { |
| |
| buff := make([]byte, 16) |
| numRead, err := rand.Read(buff) |
| if numRead != len(buff) || err != nil { |
| panic(err) |
| } |
| /* uuid v4 spec */ |
| buff[6] = (buff[6] | 0x40) & 0x4F |
| buff[8] = (buff[8] | 0x80) & 0xBF |
| return fmt.Sprintf("%x-%x-%x-%x-%x", buff[0:4], buff[4:6], buff[6:8], buff[8:10], buff[10:]) |
| } |