|  | /* | 
|  | Copyright 2016 The Transicator Authors | 
|  |  | 
|  | Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | you may not use this file except in compliance with the License. | 
|  | You may obtain a copy of the License at | 
|  |  | 
|  | http://www.apache.org/licenses/LICENSE-2.0 | 
|  |  | 
|  | Unless required by applicable law or agreed to in writing, software | 
|  | distributed under the License is distributed on an "AS IS" BASIS, | 
|  | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | See the License for the specific language governing permissions and | 
|  | limitations under the License. | 
|  | */ | 
|  | package snapshotserver | 
|  |  | 
|  | import ( | 
|  | "bytes" | 
|  | "database/sql" | 
|  | "encoding/json" | 
|  | "errors" | 
|  | "fmt" | 
|  | "io" | 
|  | "net/http" | 
|  | "regexp" | 
|  | "strconv" | 
|  | "strings" | 
|  |  | 
|  | "github.com/30x/goscaffold" | 
|  | "github.com/apigee-labs/transicator/common" | 
|  | "github.com/julienschmidt/httprouter" | 
|  |  | 
|  | log "github.com/Sirupsen/logrus" | 
|  | ) | 
|  |  | 
|  | const ( | 
|  | jsonType                 = "json" | 
|  | protoType                = "proto" | 
|  | sqliteDataType           = "sqlite" | 
|  | jsonMediaType            = "application/json" | 
|  | protoMediaType           = "application/transicator+protobuf" | 
|  | sqlMediaType             = "application/transicator+sqlite" | 
|  | changeSelectorValidChars = "^[0-9a-z_-]+$" | 
|  | ) | 
|  |  | 
|  | var reChangeSelector = regexp.MustCompile(changeSelectorValidChars) | 
|  |  | 
|  | /* | 
|  | GetTenants returns a list of tenant IDs turned into a string. | 
|  | */ | 
|  | func GetTenants(tenantID []string) string { | 
|  |  | 
|  | var str bytes.Buffer | 
|  |  | 
|  | str.WriteString("(") | 
|  | for idx, tid := range tenantID { | 
|  | log.Debugf("Get table id: %s %d", tid, idx) | 
|  | str.WriteString("'" + tid + "'") | 
|  | if idx != len(tenantID)-1 { | 
|  | str.WriteString(",") | 
|  | } | 
|  | } | 
|  | str.WriteString(")") | 
|  | return str.String() | 
|  |  | 
|  | } | 
|  |  | 
|  | /* | 
|  | GetScopes returns the set of scopes for a particular cluster. | 
|  | */ | 
|  | func GetScopes( | 
|  | w http.ResponseWriter, r *http.Request, | 
|  | db *sql.DB, p httprouter.Params) { | 
|  |  | 
|  | cid := p[0].Value | 
|  | if cid == "" { | 
|  | log.Errorf("apidclusterId Missing, Request Ignored") | 
|  | return | 
|  | } | 
|  |  | 
|  | data, err := GetScopeData(cid, db) | 
|  | if err != nil { | 
|  | log.Errorf("GetOrgSnapshot error: %v", err) | 
|  | return | 
|  | } | 
|  |  | 
|  | size, err := w.Write(data) | 
|  | if err != nil { | 
|  | log.Errorf("Writing snapshot id %s : Err: %s", cid, err) | 
|  | return | 
|  | } | 
|  |  | 
|  | log.Debugf("Downloaded Scopes for id %s, size %d", cid, size) | 
|  | return | 
|  | } | 
|  |  | 
|  | /* | 
|  | GetScopeData actually pulls the data for a scope. | 
|  | */ | 
|  | func GetScopeData(cid string, db *sql.DB) (b []byte, err error) { | 
|  |  | 
|  | var ( | 
|  | snapInfo, snapTime string | 
|  | ) | 
|  |  | 
|  | sdataItem := []common.Table{} | 
|  | snapData := common.Snapshot{ | 
|  | Tables:       sdataItem, | 
|  | SnapshotInfo: snapInfo, | 
|  | Timestamp:    snapTime, | 
|  | } | 
|  |  | 
|  | tx, err := db.Begin() | 
|  | if err != nil { | 
|  | log.Errorf("Error starting transaction: %s", err) | 
|  | return nil, err | 
|  | } | 
|  | defer tx.Commit() | 
|  |  | 
|  | rows, err := tx.Query("select * from APID_CLUSTER where id = $1", cid) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to query APID_CLUSTER. Err: %s", err) | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | err = fillTable(rows, &snapData, "APID_CLUSTER") | 
|  | if err != nil { | 
|  | log.Errorf("Failed to Insert rows, (Ignored) Err: %s", err) | 
|  | return nil, err | 
|  | } | 
|  | rows.Close() | 
|  |  | 
|  | rows, err = tx.Query("select * from DATA_SCOPE where apid_cluster_id = $1", cid) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to query DATA_SCOPE. Err: %s", err) | 
|  | return nil, err | 
|  | } | 
|  |  | 
|  | err = fillTable(rows, &snapData, "DATA_SCOPE") | 
|  | if err != nil { | 
|  | log.Errorf("Failed to Insert rows, (Ignored) Err: %s", err) | 
|  | return nil, err | 
|  | } | 
|  | rows.Close() | 
|  |  | 
|  | b, err = json.Marshal(snapData) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | return b, nil | 
|  | } | 
|  |  | 
|  | func fillTable(rows *sql.Rows, snapData *common.Snapshot, table string) (err error) { | 
|  |  | 
|  | srvItems := []common.Row{} | 
|  | stdItem := common.Table{ | 
|  | Rows: srvItems, | 
|  | Name: table, | 
|  | } | 
|  |  | 
|  | columnNames, columnTypes, err := parseColumnNames(rows) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get tenant data in Table %s : %+v", table, err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | for rows.Next() { | 
|  | srvItem := common.Row{} | 
|  | cols := make([]interface{}, len(columnNames)) | 
|  | for i := range cols { | 
|  | cols[i] = new(interface{}) | 
|  | } | 
|  | err = rows.Scan(cols...) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get tenant data  in Table %s : %+v", table, err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | for i, cv := range cols { | 
|  | cvp := cv.(*interface{}) | 
|  | scv := &common.ColumnVal{ | 
|  | Value: *cvp, | 
|  | Type:  columnTypes[i], | 
|  | } | 
|  | srvItem[columnNames[i]] = scv | 
|  | } | 
|  | stdItem.AddRowstoTable(srvItem) | 
|  | } | 
|  | snapData.AddTables(stdItem) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | GetTenantSnapshotData pulls the snapshot for a given set of tenants and sends | 
|  | them back to a response writer. | 
|  | */ | 
|  | func GetTenantSnapshotData( | 
|  | tenantID []string, mediaType string, | 
|  | db *sql.DB, w io.Writer) error { | 
|  |  | 
|  | var ( | 
|  | snapInfo, snapTime string | 
|  | ) | 
|  |  | 
|  | log.Debug("Starting snapshot") | 
|  | tx, err := db.Begin() | 
|  | if err != nil { | 
|  | log.Errorf("Failed to set Isolation level : %+v", err) | 
|  | return err | 
|  | } | 
|  | defer tx.Commit() | 
|  |  | 
|  | row := db.QueryRow("select now()") | 
|  | err = row.Scan(&snapTime) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get DB timestamp : %+v", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | row = db.QueryRow("select txid_current_snapshot()") | 
|  | err = row.Scan(&snapInfo) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get DB snapshot TXID : %+v", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | tables, err := getSchemaAndTableNames(db) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to table names: %+v", err) | 
|  | return err | 
|  | } | 
|  | log.Debugf("Tables in snapshot: %v", tables) | 
|  |  | 
|  | sdataItem := []common.Table{} | 
|  | snapData := &common.Snapshot{ | 
|  | Tables:       sdataItem, | 
|  | SnapshotInfo: snapInfo, | 
|  | Timestamp:    snapTime, | 
|  | } | 
|  |  | 
|  | switch mediaType { | 
|  | case jsonType: | 
|  | return writeJSONSnapshot(snapData, tables, tenantID, db, w) | 
|  | case protoType: | 
|  | return writeProtoSnapshot(snapData, tables, tenantID, db, w) | 
|  | default: | 
|  | panic("Media type processing failed") | 
|  | } | 
|  | } | 
|  |  | 
|  | func writeJSONSnapshot( | 
|  | snapData *common.Snapshot, tables []string, tenantID []string, | 
|  | db *sql.DB, w io.Writer) error { | 
|  |  | 
|  | for _, tn := range tables { | 
|  | // Postgres won't let us parameterize the table name here, and we don't | 
|  | // know how to parameterize the list in the "in" parameter | 
|  | q := fmt.Sprintf("select * from %s where %s in %s", | 
|  | tn, selectorColumn, GetTenants(tenantID)) | 
|  | rows, err := db.Query(q) | 
|  | if err != nil { | 
|  | if strings.Contains(err.Error(), "errorMissingColumn") { | 
|  | log.Debugf("Skipping table %s: no %s column", tn, selectorColumn) | 
|  | continue | 
|  | } | 
|  | log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, tn, err) | 
|  | return err | 
|  | } | 
|  | defer rows.Close() | 
|  | err = fillTable(rows, snapData, tn) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to Insert Table [%s] - Ignored. Err: %+v", tn, err) | 
|  | } | 
|  | } | 
|  |  | 
|  | json := snapData.Marshal() | 
|  | w.Write(json) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func writeProtoSnapshot( | 
|  | snapData *common.Snapshot, tables []string, tenantID []string, | 
|  | db *sql.DB, w io.Writer) error { | 
|  |  | 
|  | sw, err := common.CreateSnapshotWriter( | 
|  | snapData.Timestamp, snapData.SnapshotInfo, w) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to start snapshot: %s", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | for _, t := range tables { | 
|  | q := fmt.Sprintf("select * from %s where %s in %s", t, selectorColumn, GetTenants(tenantID)) | 
|  | rows, err := db.Query(q) | 
|  | if err != nil { | 
|  | if strings.Contains(err.Error(), "errorMissingColumn") { | 
|  | log.Debugf("Skipping table %s: no %s column", t, selectorColumn) | 
|  | continue | 
|  | } | 
|  | log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err) | 
|  | return err | 
|  | } | 
|  | defer rows.Close() | 
|  |  | 
|  | columnNames, columnTypes, err := parseColumnNames(rows) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err) | 
|  | return err | 
|  | } | 
|  | var cis []common.ColumnInfo | 
|  | for i := range columnNames { | 
|  | ci := common.ColumnInfo{ | 
|  | Name: columnNames[i], | 
|  | Type: columnTypes[i], | 
|  | } | 
|  | cis = append(cis, ci) | 
|  | } | 
|  |  | 
|  | err = sw.StartTable(t, cis) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to start table: %s", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | for rows.Next() { | 
|  | cols := make([]interface{}, len(columnNames)) | 
|  | for i := range cols { | 
|  | cols[i] = new(interface{}) | 
|  | } | 
|  | err = rows.Scan(cols...) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get tenant data <Query: %s> in Table %s : %+v", q, t, err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | err = sw.WriteRow(cols) | 
|  | if err != nil { | 
|  | log.Errorf("Error writing column values: %s", err) | 
|  | return err | 
|  | } | 
|  | } | 
|  |  | 
|  | err = sw.EndTable() | 
|  | if err != nil { | 
|  | log.Errorf("Error ending table: %s", err) | 
|  | return err | 
|  | } | 
|  |  | 
|  | // OK to close more than once. Do it here to free up SQL connection. | 
|  | rows.Close() | 
|  | } | 
|  |  | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func getSchemaAndTableNames(db *sql.DB) ([]string, error) { | 
|  | nameRows, err := db.Query(` | 
|  | SELECT table_schema, table_name FROM information_schema.tables | 
|  | WHERE table_schema not in ('pg_catalog', 'information_schema') | 
|  | `) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get tables from DB : %+v", err) | 
|  | return nil, err | 
|  | } | 
|  | defer nameRows.Close() | 
|  |  | 
|  | var tables []string | 
|  | for nameRows.Next() { | 
|  | var schema, name string | 
|  | err = nameRows.Scan(&schema, &name) | 
|  | if err != nil { | 
|  | log.Errorf("Failed to get table names from DB : %+v", err) | 
|  | return nil, err | 
|  | } | 
|  | tables = append(tables, schema+"."+name) | 
|  | } | 
|  | return tables, nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | parseColumnNames uses a feature of our Postgres driver that returns column | 
|  | names in the format "name:type" instead of just "name". "type" in this case | 
|  | is the numeric Postgres type ID. | 
|  | */ | 
|  | func parseColumnNames(rows *sql.Rows) ([]string, []int32, error) { | 
|  | cols, err := rows.Columns() | 
|  | if err != nil { | 
|  | return nil, nil, err | 
|  | } | 
|  |  | 
|  | names := make([]string, len(cols)) | 
|  | types := make([]int32, len(cols)) | 
|  |  | 
|  | for i, n := range cols { | 
|  | sn := strings.SplitN(n, ":", 2) | 
|  | if len(sn) > 0 { | 
|  | names[i] = sn[0] | 
|  | } | 
|  | if len(sn) > 1 { | 
|  | tn, err := strconv.ParseInt(sn[1], 10, 32) | 
|  | if err != nil { | 
|  | return nil, nil, err | 
|  | } | 
|  | types[i] = int32(tn) | 
|  | } | 
|  | } | 
|  |  | 
|  | return names, types, nil | 
|  | } | 
|  |  | 
|  | /* | 
|  | GenSnapshot is currently implemented in SYNC mode, where in, it | 
|  | simply returns the scope back the ID redirect URL to query upon, | 
|  | to get the snapshot - which is yet another SYNC operation | 
|  | */ | 
|  | func GenSnapshot(w http.ResponseWriter, r *http.Request) { | 
|  |  | 
|  | var changeSelectorParam string | 
|  |  | 
|  | r.ParseForm() | 
|  | changeSelectorInput, err := getCheckChangeSelectorParams(r) | 
|  | if err != nil { | 
|  | sendAPIError(invalidRequestParam, err.Error(), w, r) | 
|  | return | 
|  | } | 
|  | if len(changeSelectorInput) == 0 { | 
|  | sendAPIError(missingScope, "", w, r) | 
|  | return | 
|  | } | 
|  |  | 
|  | mediaType := goscaffold.SelectMediaType(r, | 
|  | []string{jsonMediaType, sqlMediaType, protoMediaType}) | 
|  | typeParam := "" | 
|  |  | 
|  | switch mediaType { | 
|  | case jsonMediaType: | 
|  | typeParam = jsonType | 
|  | case sqlMediaType: | 
|  | typeParam = sqliteDataType | 
|  | case protoMediaType: | 
|  | typeParam = protoType | 
|  | default: | 
|  | sendAPIError(unsupportedMediaType, "", w, r) | 
|  | return | 
|  | } | 
|  | for _, selector := range changeSelectorInput { | 
|  | changeSelectorParam += "selector=" + selector + "&" | 
|  | } | 
|  | redURL := "/data?" + changeSelectorParam + "type=" + typeParam | 
|  |  | 
|  | http.Redirect(w, r, redURL, http.StatusSeeOther) | 
|  | } | 
|  |  | 
|  | /* | 
|  | DownloadSnapshot downloads and returns the JSON related to the scope | 
|  | */ | 
|  | func DownloadSnapshot( | 
|  | w http.ResponseWriter, r *http.Request, | 
|  | db *sql.DB, p httprouter.Params) { | 
|  | scopes, err := getCheckChangeSelectorParams(r) | 
|  | if err != nil { | 
|  | sendAPIError(invalidRequestParam, err.Error(), w, r) | 
|  | return | 
|  | } | 
|  | if len(scopes) == 0 { | 
|  | sendAPIError(missingScope, "", w, r) | 
|  | return | 
|  | } | 
|  |  | 
|  | mediaType := r.URL.Query().Get("type") | 
|  | if mediaType == "" { | 
|  | mediaType = jsonType | 
|  | } | 
|  |  | 
|  | switch mediaType { | 
|  | case jsonType: | 
|  | w.Header().Add("Content-Type", jsonMediaType) | 
|  | case sqliteDataType: | 
|  | err := WriteSqliteSnapshot(scopes, db, w, r) | 
|  | if err != nil { | 
|  | log.Errorf("GetTenantSnapshotData error: %v", err) | 
|  | } | 
|  | return | 
|  | case protoType: | 
|  | w.Header().Add("Content-Type", protoMediaType) | 
|  | default: | 
|  | w.WriteHeader(http.StatusBadRequest) | 
|  | return | 
|  | } | 
|  |  | 
|  | err = GetTenantSnapshotData(scopes, mediaType, db, w) | 
|  | if err != nil { | 
|  | log.Errorf("GetTenantSnapshotData error: %v", err) | 
|  | sendAPIError(http.StatusInternalServerError, err.Error(), w, r) | 
|  | return | 
|  | } | 
|  |  | 
|  | log.Debugf("Downloaded snapshot", scopes) | 
|  | return | 
|  | } | 
|  |  | 
|  | /* | 
|  | getChangeSelectorParams combines all 'scope' and 'selector' query | 
|  | params into one slice after checking for valid characters. The | 
|  | 'selector' query param is an alias for 'scope' | 
|  | */ | 
|  | func getCheckChangeSelectorParams(r *http.Request) ([]string, error) { | 
|  | scopes := r.URL.Query()["scope"] | 
|  | for _, s := range scopes { | 
|  | if !reChangeSelector.MatchString(s) { | 
|  | return nil, errors.New("Invalid char in scope param") | 
|  | } | 
|  | } | 
|  | selectors := r.URL.Query()["selector"] | 
|  | for _, s := range selectors { | 
|  | if !reChangeSelector.MatchString(s) { | 
|  | return nil, errors.New("Invalid char in selector param") | 
|  | } | 
|  | } | 
|  | return append(scopes, selectors...), nil | 
|  | } |