blob: e61b83e9de6124dc279236e1b16e34b591c88fe0 [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidApigeeSync
import (
"encoding/hex"
"encoding/json"
"fmt"
"hash/crc32"
"math/rand"
"net/http"
"net/url"
"sync"
"sync/atomic"
"net"
"database/sql"
"github.com/apid/apid-core"
"github.com/apid/apid-core/util"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"io"
"io/ioutil"
"os"
"strconv"
)
/*
Currently limited to:
1 cluster, 1 scope, 1 org, 1 env, 1 company
1 app & 1 product per developer
Notes:
Scope ~= org + env
tenant_id == Scope for our purposes
(technically, data_scope.scope = tenant_id)
Relations:
company => * developer
developer => * app
application => * app_credential
product => * app_credential
*/
const oauthExpiresIn = 2 * 60 // 2 minutes
type MockParms struct {
ReliableAPI bool
ClusterID string
TokenKey string
TokenSecret string
Scope string
Organization string
Environment string
NumDevelopers int
NumDeployments int
BundleURI string
}
func Mock(params MockParms, router apid.Router) *MockServer {
m := &MockServer{}
m.params = params
m.init()
m.registerRoutes(router)
return m
}
// table name -> common.Row
type tableRowMap map[string]common.Row
type MockServer struct {
params MockParms
oauthToken string
snapshotID string
changeChannel chan []byte
sequenceID *int64
maxDevID *int64
deployIDMutex *sync.RWMutex
minDeploymentID *int64
maxDeploymentID *int64
newSnap *int32
authFail *int32
}
func (m *MockServer) forceAuthFailOnce() {
atomic.StoreInt32(m.authFail, 1)
}
func (m *MockServer) normalAuthCheck() {
atomic.StoreInt32(m.authFail, 0)
}
func (m *MockServer) passAuthCheck() {
atomic.StoreInt32(m.authFail, 2)
}
func (m *MockServer) forceNewSnapshot() {
atomic.StoreInt32(m.newSnap, 1)
}
func (m *MockServer) forceNoSnapshot() {
atomic.StoreInt32(m.newSnap, 0)
}
func (m *MockServer) lastSequenceID() string {
num := strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
return num + "." + num + "." + num
}
func (m *MockServer) nextSequenceID() string {
num := strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
return num + "." + num + "." + num
}
func (m *MockServer) nextDeveloperID() string {
return strconv.FormatInt(atomic.AddInt64(m.maxDevID, 1), 10)
}
func (m *MockServer) randomDeveloperID() string {
return strconv.FormatInt(rand.Int63n(atomic.LoadInt64(m.maxDevID)), 10)
}
func (m *MockServer) nextDeploymentID() string {
return strconv.FormatInt(atomic.AddInt64(m.maxDeploymentID, 1), 10)
}
func (m *MockServer) popDeploymentID() string {
newMinID := atomic.AddInt64(m.minDeploymentID, 1)
return strconv.FormatInt(newMinID-1, 10)
}
func initDb(statements, path string) {
f, _ := os.Create(path)
f.Close()
db, err := sql.Open("sqlite3", path)
if err != nil {
log.Panic("Could not instantiate mock db, %s", err)
}
defer db.Close()
sqlStatementsBuffer, err := ioutil.ReadFile(statements)
if err != nil {
log.Panic("Could not instantiate mock db, %s", err)
}
sqlStatementsString := string(sqlStatementsBuffer)
_, err = db.Exec(sqlStatementsString)
if err != nil {
log.Panic("Could not instantiate mock db, %s", err)
}
}
func (m *MockServer) init() {
defer GinkgoRecover()
RegisterFailHandler(func(message string, callerSkip ...int) {
log.Errorf("Expect error: %s", message)
panic(message)
})
m.sequenceID = new(int64)
m.maxDevID = new(int64)
m.changeChannel = make(chan []byte)
m.minDeploymentID = new(int64)
*m.minDeploymentID = 1
m.maxDeploymentID = new(int64)
m.newSnap = new(int32)
m.authFail = new(int32)
*m.authFail = 0
m.deployIDMutex = &sync.RWMutex{}
initDb("./sql/init_mock_db.sql", "./mockdb.sqlite3")
initDb("./sql/init_mock_boot_db.sql", "./mockdb_boot.sqlite3")
}
// developer, product, application, credential will have the same ID (developerID)
func (m *MockServer) createDeveloperWithProductAndApp() tableRowMap {
developerID := m.nextDeveloperID()
devRows := m.createDeveloper(developerID)
productRows := m.createProduct(developerID)
appRows := m.createApplication(developerID, developerID, developerID, developerID)
return m.mergeTableRowMaps(devRows, productRows, appRows)
}
func (m *MockServer) registerRoutes(router apid.Router) {
router.HandleFunc("/accesstoken", m.unreliable(m.gomega(m.sendToken))).Methods("POST")
router.HandleFunc("/snapshots", m.unreliable(m.gomega(m.auth(m.sendSnapshot)))).Methods("GET")
router.HandleFunc("/changes", m.unreliable(m.gomega(m.auth(m.sendChanges)))).Methods("GET")
router.HandleFunc("/bundles/{id}", m.sendDeploymentBundle).Methods("GET")
router.HandleFunc("/analytics", m.sendAnalyticsURL).Methods("GET")
router.HandleFunc("/analytics", m.putAnalyticsData).Methods("PUT")
}
func (m *MockServer) sendAnalyticsURL(w http.ResponseWriter, req *http.Request) {
uri := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
w.Write([]byte(fmt.Sprintf("{ \"url\": \"%s\" }", uri)))
}
func (m *MockServer) putAnalyticsData(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
}
func (m *MockServer) sendDeploymentBundle(w http.ResponseWriter, req *http.Request) {
vars := apid.API().Vars(req)
w.Write([]byte("/bundles/" + vars["id"]))
}
func (m *MockServer) sendToken(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
Expect(req.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded; param=value"))
err := req.ParseForm()
Expect(err).NotTo(HaveOccurred())
Expect(req.Form.Get("grant_type")).To(Equal("client_credentials"))
Expect(req.Header.Get("status")).To(Equal("ONLINE"))
Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
Expect(req.Header.Get("display_name")).ToNot(BeEmpty())
if req.Header.Get("created_at_apid") != "" {
Expect(req.Header.Get("updated_at_apid")).To(BeEmpty())
} else {
Expect(req.Header.Get("updated_at_apid")).ToNot(BeEmpty())
}
Expect(req.Form.Get("client_id")).To(Equal(m.params.TokenKey))
Expect(req.Form.Get("client_secret")).To(Equal(m.params.TokenSecret))
var plugInfo []pluginDetail
plInfo := []byte(req.Header.Get("plugin_details"))
err = json.Unmarshal(plInfo, &plugInfo)
Expect(err).NotTo(HaveOccurred())
m.oauthToken = util.GenerateUUID()
res := OauthToken{
AccessToken: m.oauthToken,
ExpiresIn: oauthExpiresIn,
}
body, err := json.Marshal(res)
Expect(err).NotTo(HaveOccurred())
w.Write(body)
}
func (m *MockServer) sendSnapshot(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
q := req.URL.Query()
scopes := q["scope"]
Expect(scopes).To(ContainElement(m.params.ClusterID))
if m.params.Scope != "" {
Expect(scopes).To(ContainElement(m.params.Scope))
}
m.snapshotID = util.GenerateUUID()
w.Header().Set(headerSnapshotNumber, m.snapshotID)
if len(scopes) == 1 {
//send bootstrap db
err := streamFile("./mockdb_boot.sqlite3", w)
Expect(err).NotTo(HaveOccurred())
return
} else {
//send data db
err := streamFile("./mockdb.sqlite3", w)
Expect(err).NotTo(HaveOccurred())
return
}
}
func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
val := atomic.LoadInt32(m.newSnap)
if val > 0 {
log.Debug("MockServer: force new snapshot")
w.WriteHeader(http.StatusBadRequest)
apiErr := changeServerError{
Code: "SNAPSHOT_TOO_OLD",
}
bytes, err := json.Marshal(apiErr)
Expect(err).NotTo(HaveOccurred())
w.Write(bytes)
return
}
log.Debug("mock server sending change list")
q := req.URL.Query()
scopes := q["scope"]
_, err := strconv.Atoi(q.Get("block"))
Expect(err).NotTo(HaveOccurred())
_ = q.Get("since")
Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
//Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
Expect(scopes).To(ContainElement(m.params.ClusterID))
if m.params.Scope != "" {
Expect(scopes).To(ContainElement(m.params.Scope))
}
// todo: the following is just legacy for the existing test in apigeeSync_suite_test
developer := m.createDeveloperWithProductAndApp()
changeList := m.createInsertChange(developer)
body, err := json.Marshal(changeList)
if err != nil {
log.Errorf("Error generating developer: %v", err)
}
w.Write(body)
}
// enables GoMega handling
func (m *MockServer) gomega(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
errors := InterceptGomegaFailures(func() {
target(w, req)
})
if len(errors) > 0 {
log.Errorf("assertion errors for %s:\nheaders:%v\n%v", req.URL, req.Header, errors)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(fmt.Sprintf("assertion errors:\n%v", errors)))
}
}
}
// enforces handler auth
func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// force failing auth check
if atomic.LoadInt32(m.authFail) == 1 {
atomic.StoreInt32(m.authFail, 0)
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Force fail: bad auth token. ")))
return
}
// force passing auth check
if atomic.LoadInt32(m.authFail) == 2 {
target(w, req)
return
}
// check auth header
auth := req.Header.Get("Authorization")
expectedAuth := m.getBearerToken()
if auth != expectedAuth {
w.WriteHeader(http.StatusUnauthorized)
w.Write([]byte(fmt.Sprintf("Bad auth token. Is: %s, should be: %s", auth, expectedAuth)))
return
}
target(w, req)
}
}
func (m *MockServer) getBearerToken() string {
return fmt.Sprintf("Bearer %s", m.oauthToken)
}
// make a handler unreliable
func (m *MockServer) unreliable(target http.HandlerFunc) http.HandlerFunc {
if m.params.ReliableAPI {
return target
}
var fail bool
return func(w http.ResponseWriter, req *http.Request) {
fail = !fail
if fail {
w.WriteHeader(500)
} else {
target(w, req)
}
}
}
func (m *MockServer) newRow(keyAndVals map[string]string) (row common.Row) {
row = common.Row{}
for k, v := range keyAndVals {
row[k] = m.stringColumnVal(v)
}
// todo: remove this once apidVerifyAPIKey can deal with not having the field
row["_change_selector"] = m.stringColumnVal(m.params.Scope)
return
}
func (m *MockServer) stringColumnVal(v string) *common.ColumnVal {
return &common.ColumnVal{
Value: v,
Type: 1,
}
}
func (m *MockServer) createDeployment() tableRowMap {
deploymentID := m.nextDeploymentID()
bundleID := util.GenerateUUID()
listen := apid.Config().GetString("api_listen")
_, port, err := net.SplitHostPort(listen)
Expect(err).NotTo(HaveOccurred())
urlString := m.params.BundleURI
if urlString == "" {
urlString = fmt.Sprintf("http://localhost:%s/bundles/%s", port, bundleID)
}
uri, err := url.Parse(urlString)
Expect(err).NotTo(HaveOccurred())
hashWriter := crc32.NewIEEE()
hashWriter.Write([]byte(uri.Path))
checkSum := hex.EncodeToString(hashWriter.Sum(nil))
type bundleConfigJson struct {
Name string `json:"name"`
URI string `json:"uri"`
ChecksumType string `json:"checksumType"`
Checksum string `json:"checksum"`
}
bundleJson, err := json.Marshal(bundleConfigJson{
Name: uri.Path,
URI: urlString,
ChecksumType: "crc-32",
Checksum: checkSum,
})
Expect(err).ShouldNot(HaveOccurred())
rows := tableRowMap{}
rows["kms_deployment"] = m.newRow(map[string]string{
"id": deploymentID,
"bundle_config_id": bundleID,
"apid_cluster_id": m.params.ClusterID,
"data_scope_id": m.params.Scope,
"bundle_config_json": string(bundleJson),
"config_json": "{}",
})
return rows
}
func (m *MockServer) createDeveloper(developerID string) tableRowMap {
companyID := m.params.Organization
tenantID := m.params.Scope
rows := tableRowMap{}
rows["kms_developer"] = m.newRow(map[string]string{
"id": developerID,
"status": "Active",
"tenant_id": tenantID,
})
// map developer onto to existing company
rows["kms_company_developer"] = m.newRow(map[string]string{
"tenant_id": tenantID,
"company_id": companyID,
"developer_id": developerID,
})
return rows
}
func (m *MockServer) createProduct(productID string) tableRowMap {
tenantID := m.params.Scope
environments := fmt.Sprintf("{%s}", m.params.Environment)
resources := fmt.Sprintf("{%s}", "/") // todo: what should be here?
rows := tableRowMap{}
rows["kms_api_product"] = m.newRow(map[string]string{
"id": productID,
"api_resources": resources,
"environments": environments,
"tenant_id": tenantID,
})
return rows
}
func (m *MockServer) createApplication(developerID, productID, applicationID, credentialID string) tableRowMap {
tenantID := m.params.Scope
rows := tableRowMap{}
rows["kms_app"] = m.newRow(map[string]string{
"id": applicationID,
"developer_id": developerID,
"status": "Approved",
"tenant_id": tenantID,
})
rows["kms_app_credential"] = m.newRow(map[string]string{
"id": credentialID,
"app_id": applicationID,
"tenant_id": tenantID,
"status": "Approved",
})
rows["kms_app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
"apiprdt_id": productID,
"app_id": applicationID,
"appcred_id": credentialID,
"status": "Approved",
"tenant_id": tenantID,
})
return rows
}
func (m *MockServer) createInsertChange(newRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, row := range newRows {
change := common.Change{
Table: table,
NewRow: row,
Operation: common.Insert,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
func (m *MockServer) createDeleteChange(oldRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, row := range oldRows {
change := common.Change{
Table: table,
OldRow: row,
Operation: common.Delete,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
func (m *MockServer) createUpdateChange(oldRows, newRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, oldRow := range oldRows {
change := common.Change{
Table: table,
OldRow: oldRow,
NewRow: newRows[table],
Operation: common.Update,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
// create one tableRowMap from various tableRowMap - tables must be unique
func (m *MockServer) mergeTableRowMaps(maps ...tableRowMap) tableRowMap {
merged := tableRowMap{}
for _, m := range maps {
for name, row := range m {
if _, ok := merged[name]; ok {
panic(fmt.Sprintf("overwrite. name: %#v, row: %#v", name, row))
}
merged[name] = row
}
}
return merged
}
// create []common.Table from array of tableRowMaps
func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
result := common.ChangeList{}
if len(changeLists) > 0 {
result.FirstSequence = changeLists[0].FirstSequence
result.LastSequence = changeLists[len(changeLists)-1].LastSequence
}
for _, cl := range changeLists {
for _, c := range cl.Changes {
result.Changes = append(result.Changes, c)
}
}
return result
}
func streamFile(srcFile string, w http.ResponseWriter) error {
inFile, err := os.Open(srcFile)
if err != nil {
return err
}
defer inFile.Close()
w.Header().Set("Content-Type", "application/transicator+sqlite")
_, err = io.Copy(w, inFile)
return err
}