blob: 54c810dfbcb76cef7171667890d7998e4addf3d3 [file] [log] [blame] [edit]
package apidApigeeSync
import (
"encoding/hex"
"encoding/json"
"fmt"
"hash/crc32"
"math/rand"
"net/http"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
/*
Currently limited to:
1 cluster, 1 scope, 1 org, 1 env, 1 company
1 app & 1 product per developer
Notes:
Scope ~= org + env
tenant_id == Scope for our purposes
(technically, data_scope.scope = tenant_id)
Relations:
company => * developer
developer => * app
application => * app_credential
product => * app_credential
*/
type MockParms struct {
ReliableAPI bool
ClusterID string
TokenKey string
TokenSecret string
Scope string
Organization string
Environment string
NumDevelopers int
AddDeveloperEvery time.Duration
UpdateDeveloperEvery time.Duration
NumDeployments int
ReplaceDeploymentEvery time.Duration
BundleURI string
}
func Mock(params MockParms, router apid.Router) *MockServer {
m := &MockServer{}
m.params = params
m.init()
m.registerRoutes(router)
return m
}
// table name -> common.Row
type tableRowMap map[string]common.Row
type MockServer struct {
params MockParms
oauthToken string
snapshotID string
snapshotTables map[string][]common.Table // key = scopeID
changeChannel chan []byte
sequenceID *int64
maxDevID *int64
deployIDMutex sync.RWMutex
minDeploymentID *int64
maxDeploymentID *int64
}
func (m *MockServer) lastSequenceID() string {
return strconv.FormatInt(atomic.LoadInt64(m.sequenceID), 10)
}
func (m *MockServer) nextSequenceID() string {
return strconv.FormatInt(atomic.AddInt64(m.sequenceID, 1), 10)
}
func (m *MockServer) nextDeveloperID() string {
return strconv.FormatInt(atomic.AddInt64(m.maxDevID, 1), 10)
}
func (m *MockServer) randomDeveloperID() string {
return strconv.FormatInt(rand.Int63n(atomic.LoadInt64(m.maxDevID)), 10)
}
func (m *MockServer) nextDeploymentID() string {
return strconv.FormatInt(atomic.AddInt64(m.maxDeploymentID, 1), 10)
}
func (m *MockServer) popDeploymentID() string {
newMinID := atomic.AddInt64(m.minDeploymentID, 1)
return strconv.FormatInt(newMinID-1, 10)
}
func (m *MockServer) init() {
defer GinkgoRecover()
RegisterFailHandler(func(message string, callerSkip ...int) {
log.Errorf("Expect error: %s", message)
panic(message)
})
m.sequenceID = new(int64)
m.maxDevID = new(int64)
m.changeChannel = make(chan []byte)
m.minDeploymentID = new(int64)
*m.minDeploymentID = 1
m.maxDeploymentID = new(int64)
go m.developerGenerator()
go m.developerUpdater()
go m.deploymentReplacer()
// cluster "scope"
cluster := m.newRow(map[string]string{
"id": m.params.ClusterID,
"_change_selector": m.params.ClusterID,
})
// data scopes
var dataScopes []common.Row
dataScopes = append(dataScopes, cluster)
dataScopes = append(dataScopes, m.newRow(map[string]string{
"id": m.params.Scope,
"scope": m.params.Scope,
"org": m.params.Organization,
"env": m.params.Environment,
"apid_cluster_id": m.params.ClusterID,
"_change_selector": m.params.Scope,
}))
// cluster & data_scope snapshot tables
m.snapshotTables = map[string][]common.Table{}
m.snapshotTables[m.params.ClusterID] = []common.Table{
{
Name: "edgex.apid_cluster",
Rows: []common.Row{cluster},
},
{
Name: "edgex.data_scope",
Rows: dataScopes,
},
}
var snapshotTableRows []tableRowMap
// generate one company
companyID := m.params.Organization
tenantID := m.params.Scope
changeSelector := m.params.Scope
company := tableRowMap{
"kms.company": m.newRow(map[string]string{
"id": companyID,
"status": "Active",
"tenant_id": tenantID,
"name": companyID,
"display_name": companyID,
"_change_selector": changeSelector,
}),
}
snapshotTableRows = append(snapshotTableRows, company)
// generate snapshot developers
for i := 0; i < m.params.NumDevelopers; i++ {
developer := m.createDeveloperWithProductAndApp()
snapshotTableRows = append(snapshotTableRows, developer)
}
log.Infof("created %d developers", m.params.NumDevelopers)
// generate snapshot deployments
for i := 0; i < m.params.NumDeployments; i++ {
deployment := m.createDeployment()
snapshotTableRows = append(snapshotTableRows, deployment)
}
log.Infof("created %d deployments", m.params.NumDeployments)
m.snapshotTables[m.params.Scope] = m.concatTableRowMaps(snapshotTableRows...)
if m.params.NumDevelopers < 10 && m.params.NumDeployments < 10 {
log.Debugf("snapshotTables: %v", m.snapshotTables[m.params.Scope])
}
}
// developer, product, application, credential will have the same ID (developerID)
func (m *MockServer) createDeveloperWithProductAndApp() tableRowMap {
developerID := m.nextDeveloperID()
devRows := m.createDeveloper(developerID)
productRows := m.createProduct(developerID)
appRows := m.createApplication(developerID, developerID, developerID, developerID)
return m.mergeTableRowMaps(devRows, productRows, appRows)
}
func (m *MockServer) registerRoutes(router apid.Router) {
router.HandleFunc("/accesstoken", m.unreliable(m.sendToken)).Methods("POST")
router.HandleFunc("/snapshots", m.unreliable(m.auth(m.sendSnapshot))).Methods("GET")
router.HandleFunc("/changes", m.unreliable(m.auth(m.sendChanges))).Methods("GET")
router.HandleFunc("/bundles/{id}", m.sendDeploymentBundle).Methods("GET")
router.HandleFunc("/analytics", m.sendAnalyticsURL).Methods("GET")
router.HandleFunc("/analytics", m.putAnalyticsData).Methods("PUT")
}
func (m *MockServer) sendAnalyticsURL(w http.ResponseWriter, req *http.Request) {
uri := fmt.Sprintf("http://%s%s", req.Host, req.RequestURI)
w.Write([]byte(fmt.Sprintf("{ \"url\": \"%s\" }", uri)))
}
func (m *MockServer) putAnalyticsData(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(200)
}
func (m *MockServer) sendDeploymentBundle(w http.ResponseWriter, req *http.Request) {
vars := apid.API().Vars(req)
w.Write([]byte("/bundles/" + vars["id"]))
}
func (m *MockServer) sendToken(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
m.registerFailHandler(w)
Expect(req.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded; param=value"))
err := req.ParseForm()
Expect(err).NotTo(HaveOccurred())
Expect(req.Form.Get("grant_type")).To(Equal("client_credentials"))
Expect(req.Header.Get("status")).To(Equal("ONLINE"))
Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
Expect(req.Header.Get("display_name")).ToNot(BeEmpty())
Expect(req.Form.Get("client_id")).To(Equal(m.params.TokenKey))
Expect(req.Form.Get("client_secret")).To(Equal(m.params.TokenSecret))
var plugInfo []pluginDetail
plInfo := []byte(req.Header.Get("plugin_details"))
err = json.Unmarshal(plInfo, &plugInfo)
Expect(err).NotTo(HaveOccurred())
m.oauthToken = generateUUID()
res := oauthTokenResp{
AccessToken: m.oauthToken,
}
body, err := json.Marshal(res)
Expect(err).NotTo(HaveOccurred())
w.Write(body)
}
func (m *MockServer) sendSnapshot(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
m.registerFailHandler(w)
q := req.URL.Query()
scopes := q["scope"]
Expect(scopes).To(ContainElement(m.params.ClusterID))
m.snapshotID = generateUUID()
snapshot := &common.Snapshot{
SnapshotInfo: m.snapshotID,
}
// Note: if/when we support multiple scopes, we'd have to do a merge of table rows
for _, scope := range scopes {
tables := m.snapshotTables[scope]
for _, table := range tables {
snapshot.AddTables(table)
}
}
body, err := json.Marshal(snapshot)
Expect(err).NotTo(HaveOccurred())
log.Info("sending snapshot")
if len(body) < 10000 {
log.Debugf("snapshot: %#v", string(body))
}
w.Write(body)
}
func (m *MockServer) sendChanges(w http.ResponseWriter, req *http.Request) {
defer GinkgoRecover()
m.registerFailHandler(w)
q := req.URL.Query()
scopes := q["scope"]
block, err := strconv.Atoi(req.URL.Query().Get("block"))
Expect(err).NotTo(HaveOccurred())
since := req.URL.Query().Get("since")
Expect(req.Header.Get("apid_cluster_Id")).To(Equal(m.params.ClusterID))
Expect(q.Get("snapshot")).To(Equal(m.snapshotID))
Expect(scopes).To(ContainElement(m.params.ClusterID))
//Expect(scopes).To(ContainElement(m.params.Scope))
if since != "" {
m.sendChange(w, time.Duration(block)*time.Second)
return
}
// todo: the following is just legacy for the existing test in apigeeSync_suite_test
developer := m.createDeveloperWithProductAndApp()
changeList := m.createInsertChange(developer)
body, err := json.Marshal(changeList)
if err != nil {
log.Errorf("Error generating developer: %v", err)
}
w.Write(body)
}
// generate developers w/ product and app
func (m *MockServer) developerGenerator() {
for range time.Tick(m.params.AddDeveloperEvery) {
developer := m.createDeveloperWithProductAndApp()
changeList := m.createInsertChange(developer)
body, err := json.Marshal(changeList)
if err != nil {
log.Errorf("Error adding developer: %v", err)
}
log.Info("adding developer")
log.Debugf("body: %#v", string(body))
m.changeChannel <- body
}
}
// update random developers - set username
func (m *MockServer) developerUpdater() {
for range time.Tick(m.params.UpdateDeveloperEvery) {
developerID := m.randomDeveloperID()
oldDev := m.createDeveloper(developerID)
delete(oldDev, "kms.company_developer")
newDev := m.createDeveloper(developerID)
delete(newDev, "kms.company_developer")
newRow := newDev["kms.developer"]
newRow["username"] = m.stringColumnVal("i_am_not_a_number")
changeList := m.createUpdateChange(oldDev, newDev)
body, err := json.Marshal(changeList)
if err != nil {
log.Errorf("Error updating developer: %v", err)
}
log.Info("updating developer")
log.Debugf("body: %#v", string(body))
m.changeChannel <- body
}
}
func (m *MockServer) deploymentReplacer() {
for range time.Tick(m.params.ReplaceDeploymentEvery) {
// delete
oldDep := tableRowMap{}
oldDep["edgex.deployment"] = m.newRow(map[string]string{
"id": m.popDeploymentID(),
})
deleteChange := m.createDeleteChange(oldDep)
// insert
newDep := m.createDeployment()
insertChange := m.createInsertChange(newDep)
changeList := m.concatChangeLists(deleteChange, insertChange)
body, err := json.Marshal(changeList)
if err != nil {
log.Errorf("Error replacing deployment: %v", err)
}
log.Info("replacing deployment")
log.Debugf("body: %#v", string(body))
m.changeChannel <- body
}
}
// todo: we could debounce this if necessary
func (m *MockServer) sendChange(w http.ResponseWriter, timeout time.Duration) {
select {
case change := <-m.changeChannel:
log.Info("sending change to client")
w.Write(change)
case <-time.After(timeout):
log.Info("change request timeout")
w.WriteHeader(http.StatusNotModified)
}
}
// enforces handler auth
func (m *MockServer) auth(target http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
auth := req.Header.Get("Authorization")
if auth != fmt.Sprintf("Bearer %s", m.oauthToken) {
w.WriteHeader(http.StatusBadRequest)
} else {
target(w, req)
}
}
}
// make a handler unreliable
func (m *MockServer) unreliable(target http.HandlerFunc) http.HandlerFunc {
if m.params.ReliableAPI {
return target
}
var fail bool
return func(w http.ResponseWriter, req *http.Request) {
fail = !fail
if fail {
w.WriteHeader(500)
} else {
target(w, req)
}
}
}
func (m *MockServer) registerFailHandler(w http.ResponseWriter) {
RegisterFailHandler(func(message string, callerSkip ...int) {
w.WriteHeader(400)
w.Write([]byte(message))
panic(message)
})
}
func (m *MockServer) newRow(keyAndVals map[string]string) (row common.Row) {
row = common.Row{}
for k, v := range keyAndVals {
row[k] = m.stringColumnVal(v)
}
// todo: remove this once apidVerifyAPIKey can deal with not having the field
row["_change_selector"] = m.stringColumnVal(m.params.Scope)
return
}
func (m *MockServer) stringColumnVal(v string) *common.ColumnVal {
return &common.ColumnVal{
Value: v,
Type: 1,
}
}
func (m *MockServer) createDeployment() tableRowMap {
deploymentID := m.nextDeploymentID()
bundleID := generateUUID()
port := apid.Config().GetString("api_port")
urlString := m.params.BundleURI
if urlString == "" {
urlString = fmt.Sprintf("http://localhost:%s/bundles/%s", port, bundleID)
}
uri, err := url.Parse(urlString)
Expect(err).NotTo(HaveOccurred())
hashWriter := crc32.NewIEEE()
hashWriter.Write([]byte(uri.Path))
checkSum := hex.EncodeToString(hashWriter.Sum(nil))
type bundleConfigJson struct {
Name string `json:"name"`
URI string `json:"uri"`
ChecksumType string `json:"checksumType"`
Checksum string `json:"checksum"`
}
bundleJson, err := json.Marshal(bundleConfigJson{
Name: uri.Path,
URI: urlString,
ChecksumType: "crc-32",
Checksum: checkSum,
})
Expect(err).ShouldNot(HaveOccurred())
rows := tableRowMap{}
rows["edgex.deployment"] = m.newRow(map[string]string{
"id": deploymentID,
"bundle_config_id": bundleID,
"apid_cluster_id": m.params.ClusterID,
"data_scope_id": m.params.Scope,
"bundle_config_json": string(bundleJson),
"config_json": "{}",
})
return rows
}
func (m *MockServer) createDeveloper(developerID string) tableRowMap {
companyID := m.params.Organization
tenantID := m.params.Scope
rows := tableRowMap{}
rows["kms.developer"] = m.newRow(map[string]string{
"id": developerID,
"status": "Active",
"tenant_id": tenantID,
})
// map developer onto to existing company
rows["kms.company_developer"] = m.newRow(map[string]string{
"id": developerID,
"tenant_id": tenantID,
"company_id": companyID,
"developer_id": developerID,
})
return rows
}
func (m *MockServer) createProduct(productID string) tableRowMap {
tenantID := m.params.Scope
environments := fmt.Sprintf("{%s}", m.params.Environment)
resources := fmt.Sprintf("{%s}", "/") // todo: what should be here?
rows := tableRowMap{}
rows["kms.api_product"] = m.newRow(map[string]string{
"id": productID,
"api_resources": resources,
"environments": environments,
"tenant_id": tenantID,
})
return rows
}
func (m *MockServer) createApplication(developerID, productID, applicationID, credentialID string) tableRowMap {
tenantID := m.params.Scope
rows := tableRowMap{}
rows["kms.app"] = m.newRow(map[string]string{
"id": applicationID,
"developer_id": developerID,
"status": "Approved",
"tenant_id": tenantID,
})
rows["kms.app_credential"] = m.newRow(map[string]string{
"id": credentialID,
"app_id": applicationID,
"tenant_id": tenantID,
"status": "Approved",
})
rows["kms.app_credential_apiproduct_mapper"] = m.newRow(map[string]string{
"apiprdt_id": productID,
"app_id": applicationID,
"appcred_id": credentialID,
"status": "Approved",
"tenant_id": tenantID,
})
return rows
}
func (m *MockServer) createInsertChange(newRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, row := range newRows {
change := common.Change{
Table: table,
NewRow: row,
Operation: common.Insert,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
func (m *MockServer) createDeleteChange(oldRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, row := range oldRows {
change := common.Change{
Table: table,
OldRow: row,
Operation: common.Delete,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
func (m *MockServer) createUpdateChange(oldRows, newRows tableRowMap) common.ChangeList {
var changeList = common.ChangeList{}
changeList.FirstSequence = m.lastSequenceID()
changeList.LastSequence = m.nextSequenceID()
for table, oldRow := range oldRows {
change := common.Change{
Table: table,
OldRow: oldRow,
NewRow: newRows[table],
Operation: common.Update,
}
changeList.Changes = append(changeList.Changes, change)
}
return changeList
}
// create one tableRowMap from various tableRowMap - tables must be unique
func (m *MockServer) mergeTableRowMaps(maps ...tableRowMap) tableRowMap {
merged := tableRowMap{}
for _, m := range maps {
for name, row := range m {
if _, ok := merged[name]; ok {
panic(fmt.Sprintf("overwrite. name: %#v, row: %#v", name, row))
}
merged[name] = row
}
}
return merged
}
// create []common.Table from array of tableRowMaps
func (m *MockServer) concatTableRowMaps(maps ...tableRowMap) []common.Table {
tableMap := map[string]*common.Table{}
for _, m := range maps {
for name, row := range m {
if _, ok := tableMap[name]; !ok {
tableMap[name] = &common.Table{
Name: name,
}
}
tableMap[name].AddRowstoTable(row)
}
}
result := []common.Table{}
for _, v := range tableMap {
result = append(result, *v)
}
return result
}
// create []common.Table from array of tableRowMaps
func (m *MockServer) concatChangeLists(changeLists ...common.ChangeList) common.ChangeList {
result := common.ChangeList{}
for _, cl := range changeLists {
for _, c := range cl.Changes {
result.Changes = append(result.Changes, c)
}
}
return result
}