Fix analytics so that it will pull data from the right Apigee URL using
the region location service. Fix analytics timestamps, which were not
in milliseconds. Make it possible to pass a "proxy name."
diff --git a/adapter/apigeeReport.go b/adapter/apigeeReport.go
index ae4f297..fc2ae2f 100644
--- a/adapter/apigeeReport.go
+++ b/adapter/apigeeReport.go
@@ -18,10 +18,14 @@
import (
"bytes"
+ "compress/gzip"
"encoding/json"
+ "errors"
"fmt"
"net/http"
"net/url"
+ "strings"
+ "sync"
"time"
"github.com/apid/istioApigeeAdapter/adapter/config"
@@ -30,7 +34,8 @@
)
const (
- defaultCollectURL = "https://edgemicroservices.apigee.net/edgemicro"
+ regionLookupURL = "https://edgemicroservices.apigee.net/edgemicro"
+ nanosPerMillisecond = 1000000
)
type (
@@ -41,8 +46,12 @@
analyticsLogger struct {
env adapter.Env
collectionURL string
+ organization string
+ environment string
+ actualURL string
key string
secret string
+ latch *sync.Mutex
}
)
@@ -88,23 +97,15 @@
func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) {
cfg := c.(*config.ReportParams)
- var basePath string
-
- if cfg.CollectionURL == "" {
- basePath = defaultCollectURL
- } else {
- basePath = cfg.CollectionURL
- }
- collectionURL := fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s",
- basePath, cfg.Organization, cfg.Environment)
-
al := &analyticsLogger{
env: env,
- collectionURL: collectionURL,
+ organization: cfg.Organization,
+ environment: cfg.Environment,
+ collectionURL: cfg.CollectionURL,
key: cfg.Key,
secret: cfg.Secret,
+ latch: &sync.Mutex{},
}
- env.Logger().Infof("Created Apigee Report adapter to invoke \"%s\"", collectionURL)
return al, nil
}
@@ -125,28 +126,29 @@
host := getStringLabel(entry, "hostHeader")
method := getStringLabel(entry, "httpMethod")
userAgent := getStringLabel(entry, "userAgent")
+ proxyName := getStringLabel(entry, "proxyName")
+ proxyRevision := getIntLabel(entry, "proxyRevision")
// Convert nanos (Go) to millis (Java)
- requestTime := getTimestampLabel(entry, "requestTime") / 1000
- responseTime := getTimestampLabel(entry, "responseTime") / 1000
+ requestTime := getTimestampLabel(entry, "requestTime") / nanosPerMillisecond
+ responseTime := getTimestampLabel(entry, "responseTime") / nanosPerMillisecond
responseCode := getIntLabel(entry, "responseCode")
r := common.AnalyticsRecord{
ClientReceivedStartTimestamp: requestTime,
- ClientReceivedEndTimestamp: requestTime + 1,
- ClientSentStartTimestamp: responseTime,
- ClientSentEndTimestamp: responseTime + 1,
+ // historically we want to make sure end is after start due to various AX stuff
+ ClientReceivedEndTimestamp: requestTime + 1,
+ ClientSentStartTimestamp: responseTime,
+ ClientSentEndTimestamp: responseTime + 1,
// Missing: Target times
ClientIP: sourceIP,
RequestVerb: method,
UserAgent: userAgent,
ResponseStatusCode: responseCode,
// Technically wrong because of no scheme and host header
- RequestURI: "http://" + host + path,
- // Should this include query params? Perhaps not.
- RequestPath: path,
- // Hard-coded, but should come from config
- APIProxy: "istio",
- APIProxyRevision: 1,
+ RequestURI: "http://" + host + path,
+ RequestPath: strings.Split(path, "?")[0],
+ APIProxy: proxyName,
+ APIProxyRevision: proxyRevision,
RecordType: "APIAnalytics",
}
@@ -164,18 +166,27 @@
}
func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error {
- jsonBody, err := json.Marshal(ax)
+ pushURL, err := l.getPushURL()
if err != nil {
return err
}
- req, err := http.NewRequest("POST", l.collectionURL,
- bytes.NewBuffer(jsonBody))
+ bb := &bytes.Buffer{}
+ zw := gzip.NewWriter(bb)
+ jw := json.NewEncoder(zw)
+ err = jw.Encode(ax)
+ if err != nil {
+ return err
+ }
+ zw.Close()
+
+ req, err := http.NewRequest("POST", pushURL, bb)
if err != nil {
return err
}
req.Header.Set("content-type", "application/json")
+ req.Header.Set("content-encoding", "gzip")
req.SetBasicAuth(l.key, l.secret)
resp, err := http.DefaultClient.Do(req)
if err != nil {
@@ -202,6 +213,58 @@
return nil
}
+func (l *analyticsLogger) getPushURL() (string, error) {
+ l.latch.Lock()
+ defer l.latch.Unlock()
+
+ if l.actualURL != "" {
+ return l.actualURL, nil
+ }
+
+ if l.collectionURL != "" {
+ l.actualURL = fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s",
+ l.collectionURL, l.organization, l.environment)
+ l.env.Logger().Infof("Sending analytics data to %s", l.actualURL)
+ return l.actualURL, nil
+ }
+
+ // Default config for Apigee cloud customers -- call web service to get correct URL
+ lookupURL := fmt.Sprintf("%s/region/organization/%s/environment/%s",
+ regionLookupURL, l.organization, l.environment)
+ req, err := http.NewRequest("GET", lookupURL, nil)
+ if err != nil {
+ return "", err
+ }
+ req.SetBasicAuth(l.key, l.secret)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ l.env.Logger().Errorf("Error %s getting region for analytics data from %s",
+ err, lookupURL)
+ return "", err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ msg := fmt.Sprintf("Error %d getting region for analytics data from %s",
+ resp.StatusCode, lookupURL)
+ l.env.Logger().Errorf(msg)
+ return "", errors.New(msg)
+ }
+
+ var region common.RegionResponse
+ jr := json.NewDecoder(resp.Body)
+ err = jr.Decode(®ion)
+ if err != nil {
+ l.env.Logger().Errorf("Error parsing JSON from region service: %s", err)
+ return "", err
+ }
+
+ l.actualURL = fmt.Sprintf("https://%s/edgemicro/axpublisher/organization/%s/environment/%s",
+ region.Host, l.organization, l.environment)
+ l.env.Logger().Infof("Using region \"%s\". Publishing to %s",
+ region.Region, l.actualURL)
+ return l.actualURL, nil
+}
+
func getStringLabel(le adapter.LogEntry, key string) string {
v := le.Labels[key]
if v == nil {
diff --git a/adapter/apigeeReport_test.go b/adapter/apigeeReport_test.go
index 2f279d3..159913b 100644
--- a/adapter/apigeeReport_test.go
+++ b/adapter/apigeeReport_test.go
@@ -43,7 +43,7 @@
cfg := config.ReportParams{
Organization: "foo",
Environment: "test",
- CollectionURL: "http://" + mockServer.Address(),
+ CollectionURL: "http://" + mockServer.Address() + "/edgemicro",
Key: mock.ValidPublishKey,
Secret: "NOTTHISONE",
}
@@ -80,7 +80,7 @@
cfg := config.ReportParams{
Organization: "foo",
Environment: "test",
- CollectionURL: "http://" + mockServer.Address(),
+ CollectionURL: "http://" + mockServer.Address() + "/edgemicro",
Key: mock.ValidPublishKey,
Secret: mock.ValidPublishSecret,
}
diff --git a/common/types.go b/common/types.go
index 7d81c1c..e4bff48 100644
--- a/common/types.go
+++ b/common/types.go
@@ -16,6 +16,11 @@
package common
+type RegionResponse struct {
+ Region string `json:"region"`
+ Host string `json:"host"`
+}
+
type Attribute struct {
Name string `json:"name"`
Value string `json:"value"`
@@ -43,7 +48,7 @@
RequestURI string `json:"request_uri"`
RequestPath string `json:"request_path"`
RequestVerb string `json:"request_verb"`
- ClientIP string `json:"client_ip"`
+ ClientIP string `json:"client_ip,omitempty"`
UserAgent string `json:"useragent"`
APIProxyRevision int `json:"apiproxy_revision"`
ResponseStatusCode int `json:"response_status_code"`
@@ -51,7 +56,7 @@
DeveloperApp string `json:"developer_app,omitempty"`
AccessToken string `json:"access_token,omitempty"`
ClientID string `json:"client_id,omitempty"`
- APIProduct string `json:"api_product"`
+ APIProduct string `json:"api_product,omitempty"`
}
type AnalyticsRequest struct {
diff --git a/mock/mockserver.go b/mock/mockserver.go
index 608acf1..984d5e0 100644
--- a/mock/mockserver.go
+++ b/mock/mockserver.go
@@ -17,6 +17,7 @@
package mock
import (
+ "compress/gzip"
"encoding/json"
"fmt"
"net"
@@ -60,7 +61,8 @@
router.GET("/publicKey", ms.getPublicKey)
router.GET("/products", ms.getProducts)
router.POST("/verifyApiKey", ms.getAPIKey)
- router.POST("/axpublisher/organization/:org/environment/:env", ms.publishAnalytics)
+ router.POST("/edgemicro/axpublisher/organization/:org/environment/:env", ms.publishAnalytics)
+ router.GET("/edgemicro/region/organization/:org/environment/:env", ms.getRegion)
go func() {
http.Serve(l, router)
@@ -173,9 +175,18 @@
}
var req common.AnalyticsRequest
+ var err error
defer r.Body.Close()
- dec := json.NewDecoder(r.Body)
- err := dec.Decode(&req)
+
+ if r.Header.Get("content-encoding") == "" {
+ dec := json.NewDecoder(r.Body)
+ err = dec.Decode(&req)
+ } else if r.Header.Get("content-encoding") == "gzip" {
+ zr, _ := gzip.NewReader(r.Body)
+ dec := json.NewDecoder(zr)
+ err = dec.Decode(&req)
+ }
+
if err != nil {
// This is what the existing service does
sendFault(w, 500, "Failed to execute JavaCallout. not a JSON Object",
@@ -198,6 +209,23 @@
enc.Encode(resp)
}
+func (m *MockServer) getRegion(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+ user, pw, aok := r.BasicAuth()
+ if !aok || user != ValidPublishKey || pw != ValidPublishSecret {
+ w.WriteHeader(401)
+ return
+ }
+
+ rr := common.RegionResponse{
+ Region: "test",
+ Host: m.Address(),
+ }
+
+ w.Header().Set("content-type", "application/json")
+ enc := json.NewEncoder(w)
+ enc.Encode(&rr)
+}
+
func sendFault(w http.ResponseWriter, errorCode int, fault, code string) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(errorCode)
diff --git a/mock/mockserver_test.go b/mock/mockserver_test.go
index bebf2b5..e37ee26 100644
--- a/mock/mockserver_test.go
+++ b/mock/mockserver_test.go
@@ -18,6 +18,7 @@
import (
"bytes"
+ "compress/gzip"
"crypto/x509"
"encoding/json"
"encoding/pem"
@@ -28,6 +29,8 @@
"testing"
"time"
+ "io"
+
"github.com/SermoDigital/jose/jws"
"github.com/apid/istioApigeeAdapter/common"
)
@@ -203,7 +206,7 @@
func TestAxPublishBadJSON(t *testing.T) {
resp, err := sendAnalytics([]byte("NOTJSON"), "application/json",
- ValidPublishKey, ValidPublishSecret)
+ ValidPublishKey, ValidPublishSecret, false)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -215,7 +218,7 @@
func TestAxPublishNotJSON(t *testing.T) {
resp, err := sendAnalytics([]byte("{}"), "text/plain",
- ValidPublishKey, ValidPublishSecret)
+ ValidPublishKey, ValidPublishSecret, false)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -241,7 +244,7 @@
func TestAxPublishEmpty(t *testing.T) {
resp, err := sendAnalytics([]byte("{}"), "application/json",
- ValidPublishKey, ValidPublishSecret)
+ ValidPublishKey, ValidPublishSecret, false)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -267,7 +270,7 @@
func TestAxPublishWrongUser(t *testing.T) {
resp, err := sendAnalytics([]byte("{}"), "application/json",
- "NOTVALID", ValidPublishSecret)
+ "NOTVALID", ValidPublishSecret, false)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -279,7 +282,7 @@
func TestAxPublishWrongPW(t *testing.T) {
resp, err := sendAnalytics([]byte("{}"), "application/json",
- ValidPublishKey, "NOPE")
+ ValidPublishKey, "NOPE", false)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -291,7 +294,7 @@
func TestAxPublishNoAuth(t *testing.T) {
resp, err := http.Post(
- fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()),
+ fmt.Sprintf("http://%s/edgemicro/axpublisher/organization/foo/environment/test", testMockServer.Address()),
"application/json", bytes.NewReader([]byte("{}")))
if err != nil {
t.Fatalf("Network error sending request: %s", err)
@@ -302,33 +305,13 @@
}
}
-func TestAxPublishSuccess(t *testing.T) {
- now := time.Now().UnixNano() / 1000
- req := common.AnalyticsRequest{
- Records: []common.AnalyticsRecord{
- {
- ClientReceivedStartTimestamp: now,
- ClientReceivedEndTimestamp: now + 1,
- ClientSentStartTimestamp: now + 10,
- ClientSentEndTimestamp: now + 11,
- RecordType: "APIAnalytics",
- APIProxy: "helloworld",
- RequestURI: "http://hello/world",
- RequestPath: "/world",
- RequestVerb: "GET",
- ClientIP: "192.168.201.100",
- UserAgent: "Testing",
- APIProxyRevision: 1,
- ResponseStatusCode: 200,
- },
- },
- }
- requestBod, err := json.Marshal(&req)
+func TestAxPublishCompressed(t *testing.T) {
+ requestBod, err := json.Marshal(makeRequest(10))
if err != nil {
t.Fatalf("Error making JSON: %s", err)
}
resp, err := sendAnalytics(requestBod, "application/json",
- ValidPublishKey, ValidPublishSecret)
+ ValidPublishKey, ValidPublishSecret, true)
if err != nil {
t.Fatalf("Network error sending request: %s", err)
}
@@ -344,7 +327,7 @@
t.Fatalf("Error reading JSON response: %s", err)
}
- if respBody.Accepted != 1 {
+ if respBody.Accepted != 10 {
t.Fatalf("Only %d record accepted", respBody.Accepted)
}
if respBody.Rejected != 0 {
@@ -352,15 +335,106 @@
}
}
-func sendAnalytics(body []byte, contentType, user, pw string) (*http.Response, error) {
+func sendAnalytics(body []byte, contentType, user, pw string, compress bool) (*http.Response, error) {
+ var bod io.Reader
+
+ if compress {
+ bb := &bytes.Buffer{}
+ zw := gzip.NewWriter(bb)
+ zw.Write(body)
+ zw.Close()
+ bod = bb
+ } else {
+ bod = bytes.NewBuffer(body)
+ }
+
req, err := http.NewRequest(
"POST",
- fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()),
- bytes.NewBuffer(body))
+ fmt.Sprintf("http://%s/edgemicro/axpublisher/organization/foo/environment/test", testMockServer.Address()),
+ bod)
if err != nil {
return nil, err
}
req.Header.Set("content-type", contentType)
+ if compress {
+ req.Header.Set("content-encoding", "gzip")
+ }
req.SetBasicAuth(user, pw)
return http.DefaultClient.Do(req)
}
+
+func TestRegionCheck(t *testing.T) {
+ req, err := http.NewRequest(
+ "GET",
+ fmt.Sprintf("http://%s/edgemicro/region/organization/foo/environment/test", testMockServer.Address()),
+ nil)
+ if err != nil {
+ t.Fatal("Can't make request")
+ }
+ req.SetBasicAuth(ValidPublishKey, ValidPublishSecret)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ t.Fatalf("Got status code %d", resp.StatusCode)
+ }
+
+ var rr common.RegionResponse
+ dec := json.NewDecoder(resp.Body)
+ err = dec.Decode(&rr)
+ if err != nil {
+ t.Fatalf("Error reading JSON response: %s", err)
+ }
+
+ if rr.Host != testMockServer.Address() {
+ t.Fatalf("Invalid host response %s", rr.Host)
+ }
+}
+
+func TestRegionCheckNoAuth(t *testing.T) {
+ req, err := http.NewRequest(
+ "GET",
+ fmt.Sprintf("http://%s/edgemicro/region/organization/foo/environment/test", testMockServer.Address()),
+ nil)
+ if err != nil {
+ t.Fatal("Can't make request")
+ }
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 401 {
+ t.Fatalf("Got status code %d", resp.StatusCode)
+ }
+}
+
+func makeRequest(numRecords int) *common.AnalyticsRequest {
+ var recs []common.AnalyticsRecord
+
+ for i := 0; i < numRecords; i++ {
+ now := time.Now().UnixNano() / 1000000
+ nr := common.AnalyticsRecord{
+ ClientReceivedStartTimestamp: now,
+ ClientReceivedEndTimestamp: now + 1,
+ ClientSentStartTimestamp: now + 10,
+ ClientSentEndTimestamp: now + 11,
+ RecordType: "APIAnalytics",
+ APIProxy: "helloworld",
+ RequestURI: "http://hello/world",
+ RequestPath: "/world",
+ RequestVerb: "GET",
+ ClientIP: "192.168.201.100",
+ UserAgent: "Testing",
+ APIProxyRevision: 1,
+ ResponseStatusCode: 200,
+ }
+ recs = append(recs, nr)
+ }
+
+ return &common.AnalyticsRequest{
+ Records: recs,
+ }
+}
diff --git a/testdata/configroot/scopes/global/descriptors.yml b/testdata/configroot/scopes/global/descriptors.yml
index 845533f..d66e343 100644
--- a/testdata/configroot/scopes/global/descriptors.yml
+++ b/testdata/configroot/scopes/global/descriptors.yml
@@ -75,6 +75,10 @@
valueType: STRING
target.uid:
valueType: STRING
+ proxy.name:
+ valueType: STRING
+ proxy.revision:
+ valueType: INT64
# DEPRECATED, to be removed. Use request.useragent instead.
request.user-agent:
valueType: STRING
@@ -201,3 +205,6 @@
requestTime: 5
responseTime: 5
responseCode: 2
+ proxyName: 1
+ proxyRevision: 2
+
diff --git a/testdata/configroot/scopes/global/subjects/global/rules.yml b/testdata/configroot/scopes/global/subjects/global/rules.yml
index 61c71c4..32bae3f 100644
--- a/testdata/configroot/scopes/global/subjects/global/rules.yml
+++ b/testdata/configroot/scopes/global/subjects/global/rules.yml
@@ -13,20 +13,20 @@
- descriptorName: RequestCount
maxAmount: 5000
expiration: 1s
- - kind: access-logs
- params:
- logName: accesslog.default
- log:
- descriptorName: accesslog.common
- labels:
- originIp: source.ip
- sourceUser: source.uid
- timestamp: request.time
- method: request.method | "http"
- url: request.path
- protocol: request.scheme
- responseCode: response.code
- responseSize: response.size
+ #- kind: access-logs
+ # params:
+ # logName: accesslog.default
+ # log:
+ # descriptorName: accesslog.common
+ # labels:
+ # originIp: source.ip
+ # sourceUser: source.uid
+ # timestamp: request.time
+ # method: request.method | "http"
+ # url: request.path
+ # protocol: request.scheme
+ # responseCode: response.code
+ # responseSize: response.size
- kind: access-logs
adapter: apigeeAnalytics
params:
@@ -41,4 +41,6 @@
userAgent: request.useragent
requestTime: request.time
responseTime: response.time
- responseCode: response.code
+ responseCode: response.code | 200
+ proxyName: proxy.name | "istio"
+ proxyRevision: proxy.revision | 1