blob: 2b28c44938cd646b4eaedbaa8308c0d7458bfcd2 [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshotserver
import (
"bytes"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"github.com/30x/goscaffold"
"github.com/apigee-labs/transicator/common"
"github.com/julienschmidt/httprouter"
log "github.com/Sirupsen/logrus"
)
const (
jsonType = "json"
protoType = "proto"
sqliteDataType = "sqlite"
jsonMediaType = "application/json"
protoMediaType = "application/transicator+protobuf"
sqlMediaType = "application/transicator+sqlite"
changeSelectorValidChars = "^[0-9a-z_-]+$"
)
var reChangeSelector = regexp.MustCompile(changeSelectorValidChars)
/*
GetTenants returns a list of tenant IDs turned into a string.
*/
func GetTenants(tenantID []string) string {
var str bytes.Buffer
str.WriteString("(")
for idx, tid := range tenantID {
log.Debugf("Get table id: %s %d", tid, idx)
str.WriteString("'" + tid + "'")
if idx != len(tenantID)-1 {
str.WriteString(",")
}
}
str.WriteString(")")
return str.String()
}
/*
GetScopes returns the set of scopes for a particular cluster.
*/
func GetScopes(
w http.ResponseWriter, r *http.Request,
db *sql.DB, p httprouter.Params) {
cid := p[0].Value
if cid == "" {
log.Errorf("apidclusterId Missing, Request Ignored")
return
}
data, err := GetScopeData(cid, db)
if err != nil {
log.Errorf("GetOrgSnapshot error: %v", err)
return
}
size, err := w.Write(data)
if err != nil {
log.Errorf("Writing snapshot id %s : Err: %s", cid, err)
return
}
log.Debugf("Downloaded Scopes for id %s, size %d", cid, size)
return
}
/*
GetScopeData actually pulls the data for a scope.
*/
func GetScopeData(cid string, db *sql.DB) (b []byte, err error) {
var (
snapInfo, snapTime string
)
sdataItem := []common.Table{}
snapData := common.Snapshot{
Tables: sdataItem,
SnapshotInfo: snapInfo,
Timestamp: snapTime,
}
tx, err := db.Begin()
if err != nil {
log.Errorf("Error starting transaction: %s", err)
return nil, err
}
defer tx.Commit()
rows, err := tx.Query("select * from APID_CLUSTER where id = $1", cid)
if err != nil {
log.Errorf("Failed to query APID_CLUSTER. Err: %s", err)
return nil, err
}
err = fillTable(rows, &snapData, "APID_CLUSTER")
if err != nil {
log.Errorf("Failed to Insert rows, (Ignored) Err: %s", err)
return nil, err
}
rows.Close()
rows, err = tx.Query("select * from DATA_SCOPE where apid_cluster_id = $1", cid)
if err != nil {
log.Errorf("Failed to query DATA_SCOPE. Err: %s", err)
return nil, err
}
err = fillTable(rows, &snapData, "DATA_SCOPE")
if err != nil {
log.Errorf("Failed to Insert rows, (Ignored) Err: %s", err)
return nil, err
}
rows.Close()
b, err = json.Marshal(snapData)
if err != nil {
return nil, err
}
return b, nil
}
func fillTable(rows *sql.Rows, snapData *common.Snapshot, table string) (err error) {
srvItems := []common.Row{}
stdItem := common.Table{
Rows: srvItems,
Name: table,
}
columnNames, columnTypes, err := parseColumnNames(rows)
if err != nil {
log.Errorf("Failed to get tenant data in Table %s : %+v", table, err)
return err
}
for rows.Next() {
srvItem := common.Row{}
cols := make([]interface{}, len(columnNames))
for i := range cols {
cols[i] = new(interface{})
}
err = rows.Scan(cols...)
if err != nil {
log.Errorf("Failed to get tenant data in Table %s : %+v", table, err)
return err
}
for i, cv := range cols {
cvp := cv.(*interface{})
scv := &common.ColumnVal{
Value: *cvp,
Type: columnTypes[i],
}
srvItem[columnNames[i]] = scv
}
stdItem.AddRowstoTable(srvItem)
}
snapData.AddTables(stdItem)
return nil
}
/*
GetTenantSnapshotData pulls the snapshot for a given set of tenants and sends
them back to a response writer.
*/
func GetTenantSnapshotData(
tenantID []string, mediaType string,
db *sql.DB, w io.Writer) error {
var (
snapInfo, snapTime string
)
log.Debug("Starting snapshot")
tx, err := db.Begin()
if err != nil {
log.Errorf("Failed to set Isolation level : %+v", err)
return err
}
defer tx.Commit()
row := db.QueryRow("select now()")
err = row.Scan(&snapTime)
if err != nil {
log.Errorf("Failed to get DB timestamp : %+v", err)
return err
}
row = db.QueryRow("select txid_current_snapshot()")
err = row.Scan(&snapInfo)
if err != nil {
log.Errorf("Failed to get DB snapshot TXID : %+v", err)
return err
}
tables, err := getSchemaAndTableNames(db)
if err != nil {
log.Errorf("Failed to table names: %+v", err)
return err
}
log.Debugf("Tables in snapshot: %v", tables)
sdataItem := []common.Table{}
snapData := &common.Snapshot{
Tables: sdataItem,
SnapshotInfo: snapInfo,
Timestamp: snapTime,
}
switch mediaType {
case jsonType:
return writeJSONSnapshot(snapData, tables, tenantID, db, w)
case protoType:
return writeProtoSnapshot(snapData, tables, tenantID, db, w)
default:
panic("Media type processing failed")
}
}
func writeJSONSnapshot(
snapData *common.Snapshot, tables []string, tenantID []string,
db *sql.DB, w io.Writer) error {
for _, tn := range tables {
// Postgres won't let us parameterize the table name here, and we don't
// know how to parameterize the list in the "in" parameter
q := fmt.Sprintf("select * from %s where %s in %s",
tn, selectorColumn, GetTenants(tenantID))
rows, err := db.Query(q)
if err != nil {
if strings.Contains(err.Error(), "errorMissingColumn") {
log.Debugf("Skipping table %s: no %s column", tn, selectorColumn)
continue
}
log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, tn, err)
return err
}
defer rows.Close()
err = fillTable(rows, snapData, tn)
if err != nil {
log.Errorf("Failed to Insert Table [%s] - Ignored. Err: %+v", tn, err)
}
}
json := snapData.Marshal()
w.Write(json)
return nil
}
func writeProtoSnapshot(
snapData *common.Snapshot, tables []string, tenantID []string,
db *sql.DB, w io.Writer) error {
sw, err := common.CreateSnapshotWriter(
snapData.Timestamp, snapData.SnapshotInfo, w)
if err != nil {
log.Errorf("Failed to start snapshot: %s", err)
return err
}
for _, t := range tables {
q := fmt.Sprintf("select * from %s where %s in %s", t, selectorColumn, GetTenants(tenantID))
rows, err := db.Query(q)
if err != nil {
if strings.Contains(err.Error(), "errorMissingColumn") {
log.Debugf("Skipping table %s: no %s column", t, selectorColumn)
continue
}
log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err)
return err
}
defer rows.Close()
columnNames, columnTypes, err := parseColumnNames(rows)
if err != nil {
log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err)
return err
}
var cis []common.ColumnInfo
for i := range columnNames {
ci := common.ColumnInfo{
Name: columnNames[i],
Type: columnTypes[i],
}
cis = append(cis, ci)
}
err = sw.StartTable(t, cis)
if err != nil {
log.Errorf("Failed to start table: %s", err)
return err
}
for rows.Next() {
cols := make([]interface{}, len(columnNames))
for i := range cols {
cols[i] = new(interface{})
}
err = rows.Scan(cols...)
if err != nil {
log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err)
return err
}
err = sw.WriteRow(cols)
if err != nil {
log.Errorf("Error writing column values: %s", err)
return err
}
}
err = sw.EndTable()
if err != nil {
log.Errorf("Error ending table: %s", err)
return err
}
// OK to close more than once. Do it here to free up SQL connection.
rows.Close()
}
return nil
}
func getSchemaAndTableNames(db *sql.DB) ([]string, error) {
nameRows, err := db.Query(`
SELECT table_schema, table_name FROM information_schema.tables
WHERE table_schema not in ('pg_catalog', 'information_schema')
`)
if err != nil {
log.Errorf("Failed to get tables from DB : %+v", err)
return nil, err
}
defer nameRows.Close()
var tables []string
for nameRows.Next() {
var schema, name string
err = nameRows.Scan(&schema, &name)
if err != nil {
log.Errorf("Failed to get table names from DB : %+v", err)
return nil, err
}
tables = append(tables, schema+"."+name)
}
return tables, nil
}
/*
parseColumnNames uses a feature of our Postgres driver that returns column
names in the format "name:type" instead of just "name". "type" in this case
is the numeric Postgres type ID.
*/
func parseColumnNames(rows *sql.Rows) ([]string, []int32, error) {
cols, err := rows.Columns()
if err != nil {
return nil, nil, err
}
names := make([]string, len(cols))
types := make([]int32, len(cols))
for i, n := range cols {
sn := strings.SplitN(n, ":", 2)
if len(sn) > 0 {
names[i] = sn[0]
}
if len(sn) > 1 {
tn, err := strconv.ParseInt(sn[1], 10, 32)
if err != nil {
return nil, nil, err
}
types[i] = int32(tn)
}
}
return names, types, nil
}
/*
GenSnapshot is currently implemented in SYNC mode, where in, it
simply returns the scope back the ID redirect URL to query upon,
to get the snapshot - which is yet another SYNC operation
*/
func GenSnapshot(w http.ResponseWriter, r *http.Request) {
var changeSelectorParam string
r.ParseForm()
changeSelectorInput, err := getCheckChangeSelectorParams(r)
if err != nil {
sendAPIError(invalidRequestParam, err.Error(), w, r)
return
}
if len(changeSelectorInput) == 0 {
sendAPIError(missingScope, "", w, r)
return
}
mediaType := goscaffold.SelectMediaType(r,
[]string{jsonMediaType, sqlMediaType, protoMediaType})
typeParam := ""
switch mediaType {
case jsonMediaType:
typeParam = jsonType
case sqlMediaType:
typeParam = sqliteDataType
case protoMediaType:
typeParam = protoType
default:
sendAPIError(unsupportedMediaType, "", w, r)
return
}
for _, selector := range changeSelectorInput {
changeSelectorParam += "selector=" + selector + "&"
}
redURL := "/data?" + changeSelectorParam + "type=" + typeParam
http.Redirect(w, r, redURL, http.StatusSeeOther)
}
/*
DownloadSnapshot downloads and returns the JSON related to the scope
*/
func DownloadSnapshot(
w http.ResponseWriter, r *http.Request,
db *sql.DB, p httprouter.Params) {
scopes, err := getCheckChangeSelectorParams(r)
if err != nil {
sendAPIError(invalidRequestParam, err.Error(), w, r)
return
}
if len(scopes) == 0 {
sendAPIError(missingScope, "", w, r)
return
}
mediaType := r.URL.Query().Get("type")
if mediaType == "" {
mediaType = jsonType
}
switch mediaType {
case jsonType:
w.Header().Add("Content-Type", jsonMediaType)
case sqliteDataType:
err := WriteSqliteSnapshot(scopes, db, w, r)
if err != nil {
log.Errorf("GetTenantSnapshotData error: %v", err)
}
return
case protoType:
w.Header().Add("Content-Type", protoMediaType)
default:
w.WriteHeader(http.StatusBadRequest)
return
}
err = GetTenantSnapshotData(scopes, mediaType, db, w)
if err != nil {
log.Errorf("GetTenantSnapshotData error: %v", err)
sendAPIError(http.StatusInternalServerError, err.Error(), w, r)
return
}
log.Debugf("Downloaded snapshot", scopes)
return
}
/*
getChangeSelectorParams combines all 'scope' and 'selector' query
params into one slice after checking for valid characters. The
'selector' query param is an alias for 'scope'
*/
func getCheckChangeSelectorParams(r *http.Request) ([]string, error) {
scopes := r.URL.Query()["scope"]
for _, s := range scopes {
if !reChangeSelector.MatchString(s) {
return nil, errors.New("Invalid char in scope param")
}
}
selectors := r.URL.Query()["selector"]
for _, s := range selectors {
if !reChangeSelector.MatchString(s) {
return nil, errors.New("Invalid char in selector param")
}
}
return append(scopes, selectors...), nil
}