Add report code with a mock server and tests. Add adapter config to test config.
diff --git a/README.md b/README.md
index 851eb64..13dd03c 100644
--- a/README.md
+++ b/README.md
@@ -27,7 +27,7 @@
Now, launch the Mixer binary:
- $MIXER_HOME/bazel-bin/cmd/server/mixs --logtostderr --configStoreURL fs://$THIS_ROOT/testdata/configroot
+ $MIXER_HOME/bazel-bin/cmd/server/mixs server --logtostderr --configStoreURL fs://$THIS_ROOT/testdata/configroot
(In other words the "config store URL" needs to be the absolute path of the "testdata/configroot"
directory of this repo. If you get this wrong then the mixer will silently do nothing
@@ -35,7 +35,7 @@
Once that's running, here's a sample command:
- $MIXER_HOME/bazel-bin/cmd/client/mixc check -a target.serivce=fault.svc.cluster.local \
+ $MIXER_HOME/bazel-bin/cmd/client/mixc check -a target.service=fault.svc.cluster.local \
--stringmap_attributes request.headers=apikey:SOME_VALID_API_KEY
That will send a "check" RPC to the mixer, which will respond "OK" if and only if the
diff --git a/adapter/BUILD b/adapter/BUILD
index 9e57dc2..66aabee 100644
--- a/adapter/BUILD
+++ b/adapter/BUILD
@@ -20,7 +20,10 @@
go_test(
name = "tests",
size = "medium",
- srcs = ["apigeeKeyChecker_test.go"],
+ srcs = [
+ "apigeeKeyChecker_test.go",
+ "apigeeReport_test.go",
+ ],
library = ":go_default_library",
deps = [
"//mock:go_default_library",
diff --git a/adapter/apigee.go b/adapter/apigee.go
index c11818e..6fe3853 100644
--- a/adapter/apigee.go
+++ b/adapter/apigee.go
@@ -25,5 +25,6 @@
// aspect of the Apigee functionality.
func Register(r adapter.Registrar) {
r.RegisterListsBuilder(newKeyCheckBuilder())
+ //r.RegisterApplicationLogsBuilder(newReportBuilder())
r.RegisterAccessLogsBuilder(newReportBuilder())
}
diff --git a/adapter/apigeeKeyChecker.go b/adapter/apigeeKeyChecker.go
index 6785f6f..38e3dff 100644
--- a/adapter/apigeeKeyChecker.go
+++ b/adapter/apigeeKeyChecker.go
@@ -29,9 +29,10 @@
)
const (
- checkName = "apigeeKeyChecker"
- checkDesc = "Verify an API key from a parameter"
- verifyKeyPath = "/verifyApiKey"
+ checkName = "apigeeKeyChecker"
+ checkDesc = "Verify an API key from a parameter"
+ verifyKeyPath = "/verifyApiKey"
+ defaultVerifyURL = "https://%s-%s.apigee.net/edgemicro-auth"
)
var checkConf = &config.VerifyKeyParams{}
@@ -73,8 +74,7 @@
var basePath string
if cfg.VerificationURL == "" {
- basePath = fmt.Sprintf("https://%s-%s.apigee.net/edgemicro-auth",
- cfg.Organization, cfg.Environment)
+ basePath = fmt.Sprintf(defaultVerifyURL, cfg.Organization, cfg.Environment)
} else {
basePath = cfg.VerificationURL
}
diff --git a/adapter/apigeeReport.go b/adapter/apigeeReport.go
index 2500b57..ae4f297 100644
--- a/adapter/apigeeReport.go
+++ b/adapter/apigeeReport.go
@@ -20,128 +20,229 @@
"bytes"
"encoding/json"
"fmt"
- "io"
"net/http"
- "os"
+ "net/url"
+ "time"
"github.com/apid/istioApigeeAdapter/adapter/config"
- me "github.com/hashicorp/go-multierror"
+ "github.com/apid/istioApigeeAdapter/common"
"istio.io/mixer/pkg/adapter"
)
+const (
+ defaultCollectURL = "https://edgemicroservices.apigee.net/edgemicro"
+)
+
type (
- builder struct{ adapter.DefaultBuilder }
-
- logger struct{ logStream io.Writer }
-
- analyticsRecord struct {
- ClientReceivedStartTimestamp int `json:"client_received_start_timestamp"`
- ClientReceivedEndTimestamp int `json:"client_received_end_timestamp"`
- RecordType string `json:"recordType"`
- ApiProxy string `json:"apiproxy"`
- RequestUri string `json:"request_uri"`
- RequestPath string `json:"request_path"`
- RequestVerb string `json:"request_verb"`
- ClientIp string `json:"client_ip"`
- UserAgent string `json:"useragent"`
- ApiProxyRevision string `json:"apiproxy_revision"`
- ResponseStatusCode int `json:"response_status_code"`
- ClientSentStartTimestamp int `json:"client_sent_start_timestamp"`
- ClientSentEndTimestamp int `json:"client_sent_end_timestamp"`
- DeveloperEmail string `json:"developer_email,omitempty"`
- DeveloperApp string `json:"developer_app"`
- AccessToken string `json:"access_token,omitempty"`
- ClientId string `json:"client_id,omitempty"`
- ApiProduct string `json:"api_product"`
+ analyticsBuilder struct {
+ adapter.DefaultBuilder
}
- analyticsRecordCollection struct {
- Records []analyticsRecord `json:"records"`
- }
-
- edgemicroKeys struct {
- Key string
- Secret string
+ analyticsLogger struct {
+ env adapter.Env
+ collectionURL string
+ key string
+ secret string
}
)
-func newReportBuilder() builder {
- return builder{adapter.NewDefaultBuilder(
- "apigeeReport",
- "Report logs to apigee",
- &config.ReportParams{},
- )}
+func newReportBuilder() analyticsBuilder {
+ return analyticsBuilder{
+ adapter.NewDefaultBuilder(
+ "apigeeReport",
+ "Report logs to apigee",
+ &config.ReportParams{},
+ )}
}
-func (builder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) {
- return newLogger(cfg)
-}
-
-func (builder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) {
- return newLogger(cfg)
-}
-
-func newLogger(cfg adapter.Config) (*logger, error) {
- c := cfg.(*config.ReportParams)
-
- w := os.Stderr
- if c.LogStream == config.STDOUT {
- w = os.Stdout
+func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) {
+ cfg := c.(*config.ReportParams)
+ if cfg.Organization == "" {
+ ce = ce.Appendf("organization", "Organization parameter must be specified")
}
-
- return &logger{w}, nil
-}
-
-func (l *logger) Log(entries []adapter.LogEntry) error {
- return l.log(entries)
-}
-
-func (l *logger) LogAccess(entries []adapter.LogEntry) error {
- fmt.Println("*** here")
- return l.log(entries)
-}
-
-func (l *logger) log(entries []adapter.LogEntry) error {
- var recordsCollection []analyticsRecord
-
- var errors *me.Error
- for _, entry := range entries {
-
- if err := writeJSON(l.logStream, entry); err != nil {
- errors = me.Append(errors, err)
- }
-
- ax_data := analyticsRecord{
- ClientReceivedStartTimestamp: 33,
- DeveloperApp: "microgateway-demo",
- }
-
- recordsCollection = append(recordsCollection, ax_data)
-
+ if cfg.Environment == "" {
+ ce = ce.Appendf("environment", "Environment parameter must be specified")
}
-
- res2B, _ := json.Marshal(recordsCollection)
- fmt.Println(string(res2B))
-
- return errors.ErrorOrNil()
+ if cfg.CollectionURL != "" {
+ _, err := url.Parse(cfg.CollectionURL)
+ if err != nil {
+ ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err)
+ }
+ }
+ if cfg.Key == "" {
+ ce = ce.Appendf("key", "Key must be specified")
+ }
+ if cfg.Secret == "" {
+ ce = ce.Appendf("secret", "Secret must be specified")
+ }
+ return
}
-func (l *logger) Close() error { return nil }
-
-func writeJSON(w io.Writer, le interface{}) error {
- return json.NewEncoder(w).Encode(le)
+func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) {
+ return newLogger(env, cfg)
}
-func sendAnalyticsRecords(collection analyticsRecordCollection, keys edgemicroKeys, uri string) bool {
- serializedBody, _ := json.Marshal(&collection)
- req, _ := http.NewRequest("POST", uri, bytes.NewBuffer(serializedBody))
- req.SetBasicAuth(keys.Key, keys.Secret)
- req.Header.Add("content-type", "application/json")
- client := &http.Client{}
- resp, _ := client.Do(req)
- if resp.StatusCode != 200 {
- return false
+func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) {
+ return newLogger(env, cfg)
+}
+
+func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) {
+ cfg := c.(*config.ReportParams)
+ var basePath string
+
+ if cfg.CollectionURL == "" {
+ basePath = defaultCollectURL
} else {
- return true
+ basePath = cfg.CollectionURL
+ }
+ collectionURL := fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s",
+ basePath, cfg.Organization, cfg.Environment)
+
+ al := &analyticsLogger{
+ env: env,
+ collectionURL: collectionURL,
+ key: cfg.Key,
+ secret: cfg.Secret,
+ }
+ env.Logger().Infof("Created Apigee Report adapter to invoke \"%s\"", collectionURL)
+ return al, nil
+}
+
+func (l *analyticsLogger) Log(entries []adapter.LogEntry) error {
+ return l.log(entries)
+}
+
+func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error {
+ return l.log(entries)
+}
+
+func (l *analyticsLogger) log(entries []adapter.LogEntry) error {
+ var records []common.AnalyticsRecord
+
+ for _, entry := range entries {
+ sourceIP := getStringLabel(entry, "sourceIP")
+ path := getStringLabel(entry, "urlPath")
+ host := getStringLabel(entry, "hostHeader")
+ method := getStringLabel(entry, "httpMethod")
+ userAgent := getStringLabel(entry, "userAgent")
+ // Convert nanos (Go) to millis (Java)
+ requestTime := getTimestampLabel(entry, "requestTime") / 1000
+ responseTime := getTimestampLabel(entry, "responseTime") / 1000
+ responseCode := getIntLabel(entry, "responseCode")
+
+ r := common.AnalyticsRecord{
+ ClientReceivedStartTimestamp: requestTime,
+ ClientReceivedEndTimestamp: requestTime + 1,
+ ClientSentStartTimestamp: responseTime,
+ ClientSentEndTimestamp: responseTime + 1,
+ // Missing: Target times
+ ClientIP: sourceIP,
+ RequestVerb: method,
+ UserAgent: userAgent,
+ ResponseStatusCode: responseCode,
+ // Technically wrong because of no scheme and host header
+ RequestURI: "http://" + host + path,
+ // Should this include query params? Perhaps not.
+ RequestPath: path,
+ // Hard-coded, but should come from config
+ APIProxy: "istio",
+ APIProxyRevision: 1,
+ RecordType: "APIAnalytics",
+ }
+
+ records = append(records, r)
+ }
+
+ ax := common.AnalyticsRequest{
+ Records: records,
+ }
+ err := l.pushRecords(&ax)
+ if err != nil {
+ l.env.Logger().Errorf("Error pushing analytics: %s", err)
+ }
+ return err
+}
+
+func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error {
+ jsonBody, err := json.Marshal(ax)
+ if err != nil {
+ return err
+ }
+
+ req, err := http.NewRequest("POST", l.collectionURL,
+ bytes.NewBuffer(jsonBody))
+ if err != nil {
+ return err
+ }
+
+ req.Header.Set("content-type", "application/json")
+ req.SetBasicAuth(l.key, l.secret)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode)
+ }
+
+ dec := json.NewDecoder(resp.Body)
+ var axResponse common.AnalyticsResponse
+ err = dec.Decode(&axResponse)
+ if err != nil {
+ return err
+ }
+ if axResponse.Rejected > 0 {
+ return fmt.Errorf("%d out of %d analytics records rejected",
+ axResponse.Rejected, len(ax.Records))
+ }
+ l.env.Logger().Infof("%d out of %d analytics records accepted",
+ axResponse.Accepted, len(ax.Records))
+ return nil
+}
+
+func getStringLabel(le adapter.LogEntry, key string) string {
+ v := le.Labels[key]
+ if v == nil {
+ return ""
+ }
+ switch v.(type) {
+ case string:
+ return v.(string)
+ default:
+ return fmt.Sprintf("%v", v)
}
}
+
+func getIntLabel(le adapter.LogEntry, key string) int {
+ v := le.Labels[key]
+ if v == nil {
+ return 0
+ }
+ switch v.(type) {
+ case int:
+ return v.(int)
+ case int64:
+ return int(v.(int64))
+ default:
+ return 0
+ }
+}
+
+func getTimestampLabel(le adapter.LogEntry, key string) int64 {
+ v := le.Labels[key]
+ if v == nil {
+ return 0
+ }
+ switch v.(type) {
+ case time.Time:
+ return (v.(time.Time)).UnixNano()
+ case int64:
+ return v.(int64)
+ default:
+ return 0
+ }
+}
+
+func (l *analyticsLogger) Close() error { return nil }
diff --git a/adapter/apigeeReport_test.go b/adapter/apigeeReport_test.go
new file mode 100644
index 0000000..2f279d3
--- /dev/null
+++ b/adapter/apigeeReport_test.go
@@ -0,0 +1,114 @@
+/*
+Copyright 2017 Google Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package adapter
+
+import (
+ "testing"
+ "time"
+
+ "github.com/apid/istioApigeeAdapter/adapter/config"
+ "github.com/apid/istioApigeeAdapter/mock"
+ "istio.io/mixer/pkg/adapter"
+)
+
+func TestAxMissingOrg(t *testing.T) {
+ cfg := config.ReportParams{
+ CollectionURL: "http://" + mockServer.Address(),
+ Key: mock.ValidPublishKey,
+ Secret: mock.ValidPublishSecret,
+ }
+
+ rp := newReportBuilder()
+ ce := rp.ValidateConfig(&cfg)
+ if ce == nil {
+ t.Fatalf("Expected invalid config")
+ }
+}
+
+func TestAxWrongSecret(t *testing.T) {
+ cfg := config.ReportParams{
+ Organization: "foo",
+ Environment: "test",
+ CollectionURL: "http://" + mockServer.Address(),
+ Key: mock.ValidPublishKey,
+ Secret: "NOTTHISONE",
+ }
+
+ rp := newReportBuilder()
+ ce := rp.ValidateConfig(&cfg)
+ if ce != nil {
+ t.Fatalf("Expected valid config and got %s", ce)
+ }
+
+ aspect, err := rp.NewAccessLogsAspect(mockEnv, &cfg)
+ if err != nil {
+ t.Fatalf("Error creating aspect: %s", err)
+ }
+
+ err = aspect.LogAccess(
+ []adapter.LogEntry{
+ {
+ LogName: "test",
+ Timestamp: time.Now().String(),
+ Severity: adapter.Info,
+ Labels: map[string]interface{}{
+ "sourceIP": "1.2.3.4",
+ },
+ },
+ },
+ )
+ if err == nil {
+ t.Fatalf("Expected push error and got none")
+ }
+}
+
+func TestAxValidConfig(t *testing.T) {
+ cfg := config.ReportParams{
+ Organization: "foo",
+ Environment: "test",
+ CollectionURL: "http://" + mockServer.Address(),
+ Key: mock.ValidPublishKey,
+ Secret: mock.ValidPublishSecret,
+ }
+
+ rp := newReportBuilder()
+ ce := rp.ValidateConfig(&cfg)
+ if ce != nil {
+ t.Fatalf("Expected valid config and got %s", ce)
+ }
+
+ aspect, err := rp.NewAccessLogsAspect(mockEnv, &cfg)
+ if err != nil {
+ t.Fatalf("Error creating aspect: %s", err)
+ }
+
+ err = aspect.LogAccess(
+ []adapter.LogEntry{
+ {
+ LogName: "test",
+ Timestamp: time.Now().String(),
+ Severity: adapter.Info,
+ Labels: map[string]interface{}{
+ "sourceIP": "1.2.3.4",
+ },
+ },
+ },
+ )
+ if err != nil {
+ t.Fatalf("Error pushing analytics: %s", err)
+ }
+}
diff --git a/adapter/config/config.proto b/adapter/config/config.proto
index 4d85bc7..b069fbc 100644
--- a/adapter/config/config.proto
+++ b/adapter/config/config.proto
@@ -30,24 +30,16 @@
}
message ReportParams {
-
- // Stream is used to select between different logs output sinks.
- enum Stream {
- // STDERR refers to os.Stderr.
- STDERR = 0;
- // STDOUT refers to os.Stdout.
- STDOUT = 1;
- }
-
- // Selects which standard stream to write to for log entries.
- // STDERR is the default Stream.
- Stream log_stream = 1;
-
- string organization = 2;
-
- string environment = 3;
-
- string key = 4;
-
- string secret = 5;
+ // The name of the Apigee organization -- required
+ string organization = 1;
+ // The name of the Apigee environment -- required
+ string environment = 2;
+ // A URL to use to contact the collection service. Will be
+ // constructed from the organization and environment name if not
+ // specified, and assumes that the service runs at production "apigee.net".
+ string collectionURL = 3;
+ // The key required to collect analytics, from "edgemicro configure"
+ string key = 4;
+ // The secret required to collect analytics, from "edgemicro configure"
+ string secret = 5;
}
diff --git a/common/types.go b/common/types.go
index ba362ba..7d81c1c 100644
--- a/common/types.go
+++ b/common/types.go
@@ -29,6 +29,40 @@
Token string `json:"token"`
}
+type AnalyticsRecord struct {
+ ClientReceivedStartTimestamp int64 `json:"client_received_start_timestamp"`
+ ClientReceivedEndTimestamp int64 `json:"client_received_end_timestamp"`
+ ClientSentStartTimestamp int64 `json:"client_sent_start_timestamp"`
+ ClientSentEndTimestamp int64 `json:"client_sent_end_timestamp"`
+ TargetReceivedStartTimestamp int64 `json:"target_received_start_timestamp,omitempty"`
+ TargetReceivedEndTimestamp int64 `json:"target_received_end_timestamp,omitempty"`
+ TargetSentStartTimestamp int64 `json:"target_sent_start_timestamp,omitempty"`
+ TargetSentEndTimestamp int64 `json:"target_sent_end_timestamp,omitempty"`
+ RecordType string `json:"recordType"`
+ APIProxy string `json:"apiproxy"`
+ RequestURI string `json:"request_uri"`
+ RequestPath string `json:"request_path"`
+ RequestVerb string `json:"request_verb"`
+ ClientIP string `json:"client_ip"`
+ UserAgent string `json:"useragent"`
+ APIProxyRevision int `json:"apiproxy_revision"`
+ ResponseStatusCode int `json:"response_status_code"`
+ DeveloperEmail string `json:"developer_email,omitempty"`
+ DeveloperApp string `json:"developer_app,omitempty"`
+ AccessToken string `json:"access_token,omitempty"`
+ ClientID string `json:"client_id,omitempty"`
+ APIProduct string `json:"api_product"`
+}
+
+type AnalyticsRequest struct {
+ Records []AnalyticsRecord `json:"records"`
+}
+
+type AnalyticsResponse struct {
+ Accepted int `json:"accepted"`
+ Rejected int `json:"rejected"`
+}
+
type APIProduct struct {
APIResources []string `json:"apiResources"`
ApprovalType string `json:"approvalType"`
diff --git a/mock/mockserver.go b/mock/mockserver.go
index 8ab7737..608acf1 100644
--- a/mock/mockserver.go
+++ b/mock/mockserver.go
@@ -34,11 +34,14 @@
var mockInit = &sync.Once{}
const (
- ValidAPIKey1 = "12345"
+ ValidAPIKey1 = "12345"
+ ValidPublishKey = "aaaaaa"
+ ValidPublishSecret = "bbbbbb"
)
type MockServer struct {
- listener net.Listener
+ listener net.Listener
+ analyticsRecords []common.AnalyticsRecord
}
func StartMockServer(port int) (*MockServer, error) {
@@ -57,6 +60,7 @@
router.GET("/publicKey", ms.getPublicKey)
router.GET("/products", ms.getProducts)
router.POST("/verifyApiKey", ms.getAPIKey)
+ router.POST("/axpublisher/organization/:org/environment/:env", ms.publishAnalytics)
go func() {
http.Serve(l, router)
@@ -73,6 +77,10 @@
m.listener.Close()
}
+func (m *MockServer) GetAnalyticsRecords() []common.AnalyticsRecord {
+ return m.analyticsRecords
+}
+
func (m *MockServer) getPublicKey(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
w.Header().Set("content-type", "text/plain")
w.Write(mockCertPEM)
@@ -153,6 +161,43 @@
enc.Encode(&tok)
}
+func (m *MockServer) publishAnalytics(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
+ user, pw, aok := r.BasicAuth()
+ if !aok || user != ValidPublishKey || pw != ValidPublishSecret {
+ w.WriteHeader(401)
+ return
+ }
+ if r.Header.Get("content-type") != "application/json" {
+ sendAnalyticsResponse(w, 0, 1)
+ return
+ }
+
+ var req common.AnalyticsRequest
+ defer r.Body.Close()
+ dec := json.NewDecoder(r.Body)
+ err := dec.Decode(&req)
+ if err != nil {
+ // This is what the existing service does
+ sendFault(w, 500, "Failed to execute JavaCallout. not a JSON Object",
+ "steps.javacallout.ExecutionError")
+ return
+ }
+
+ // For now we just accept it all. We'll add some validation in the future.
+ m.analyticsRecords = append(m.analyticsRecords, req.Records...)
+ sendAnalyticsResponse(w, len(req.Records), 0)
+}
+
+func sendAnalyticsResponse(w http.ResponseWriter, accepted, rejected int) {
+ resp := &common.AnalyticsResponse{
+ Accepted: accepted,
+ Rejected: rejected,
+ }
+ w.Header().Set("content-type", "application/json")
+ enc := json.NewEncoder(w)
+ enc.Encode(resp)
+}
+
func sendFault(w http.ResponseWriter, errorCode int, fault, code string) {
w.Header().Set("content-type", "application/json")
w.WriteHeader(errorCode)
diff --git a/mock/mockserver_test.go b/mock/mockserver_test.go
index 862383a..bebf2b5 100644
--- a/mock/mockserver_test.go
+++ b/mock/mockserver_test.go
@@ -26,6 +26,7 @@
"net/http"
"os"
"testing"
+ "time"
"github.com/SermoDigital/jose/jws"
"github.com/apid/istioApigeeAdapter/common"
@@ -199,3 +200,167 @@
t.Fatalf("Error parsing JWT: %s", err)
}
}
+
+func TestAxPublishBadJSON(t *testing.T) {
+ resp, err := sendAnalytics([]byte("NOTJSON"), "application/json",
+ ValidPublishKey, ValidPublishSecret)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 500 {
+ t.Fatalf("Got status code %d and expected 500", resp.StatusCode)
+ }
+}
+
+func TestAxPublishNotJSON(t *testing.T) {
+ resp, err := sendAnalytics([]byte("{}"), "text/plain",
+ ValidPublishKey, ValidPublishSecret)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ t.Fatalf("Got status code %d and expected 200", resp.StatusCode)
+ }
+
+ var respBody common.AnalyticsResponse
+ dec := json.NewDecoder(resp.Body)
+ err = dec.Decode(&respBody)
+ if err != nil {
+ t.Fatalf("Error reading JSON response: %s", err)
+ }
+
+ if respBody.Accepted != 0 {
+ t.Fatalf("%d records were accepted", respBody.Accepted)
+ }
+ if respBody.Rejected != 1 {
+ t.Fatalf("%d records were rejected", respBody.Rejected)
+ }
+}
+
+func TestAxPublishEmpty(t *testing.T) {
+ resp, err := sendAnalytics([]byte("{}"), "application/json",
+ ValidPublishKey, ValidPublishSecret)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ t.Fatalf("Got status code %d and expected 200", resp.StatusCode)
+ }
+
+ var respBody common.AnalyticsResponse
+ dec := json.NewDecoder(resp.Body)
+ err = dec.Decode(&respBody)
+ if err != nil {
+ t.Fatalf("Error reading JSON response: %s", err)
+ }
+
+ if respBody.Accepted != 0 {
+ t.Fatalf("%d records were accepted", respBody.Accepted)
+ }
+ if respBody.Rejected != 0 {
+ t.Fatalf("%d records were rejected", respBody.Rejected)
+ }
+}
+
+func TestAxPublishWrongUser(t *testing.T) {
+ resp, err := sendAnalytics([]byte("{}"), "application/json",
+ "NOTVALID", ValidPublishSecret)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 401 {
+ t.Fatalf("Got status code %d and expected 401", resp.StatusCode)
+ }
+}
+
+func TestAxPublishWrongPW(t *testing.T) {
+ resp, err := sendAnalytics([]byte("{}"), "application/json",
+ ValidPublishKey, "NOPE")
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 401 {
+ t.Fatalf("Got status code %d and expected 401", resp.StatusCode)
+ }
+}
+
+func TestAxPublishNoAuth(t *testing.T) {
+ resp, err := http.Post(
+ fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()),
+ "application/json", bytes.NewReader([]byte("{}")))
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 401 {
+ t.Fatalf("Got status code %d and expected 401", resp.StatusCode)
+ }
+}
+
+func TestAxPublishSuccess(t *testing.T) {
+ now := time.Now().UnixNano() / 1000
+ req := common.AnalyticsRequest{
+ Records: []common.AnalyticsRecord{
+ {
+ ClientReceivedStartTimestamp: now,
+ ClientReceivedEndTimestamp: now + 1,
+ ClientSentStartTimestamp: now + 10,
+ ClientSentEndTimestamp: now + 11,
+ RecordType: "APIAnalytics",
+ APIProxy: "helloworld",
+ RequestURI: "http://hello/world",
+ RequestPath: "/world",
+ RequestVerb: "GET",
+ ClientIP: "192.168.201.100",
+ UserAgent: "Testing",
+ APIProxyRevision: 1,
+ ResponseStatusCode: 200,
+ },
+ },
+ }
+ requestBod, err := json.Marshal(&req)
+ if err != nil {
+ t.Fatalf("Error making JSON: %s", err)
+ }
+ resp, err := sendAnalytics(requestBod, "application/json",
+ ValidPublishKey, ValidPublishSecret)
+ if err != nil {
+ t.Fatalf("Network error sending request: %s", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ t.Fatalf("Got status code %d", resp.StatusCode)
+ }
+
+ var respBody common.AnalyticsResponse
+ dec := json.NewDecoder(resp.Body)
+ err = dec.Decode(&respBody)
+ if err != nil {
+ t.Fatalf("Error reading JSON response: %s", err)
+ }
+
+ if respBody.Accepted != 1 {
+ t.Fatalf("Only %d record accepted", respBody.Accepted)
+ }
+ if respBody.Rejected != 0 {
+ t.Fatalf("%d records were rejected", respBody.Rejected)
+ }
+}
+
+func sendAnalytics(body []byte, contentType, user, pw string) (*http.Response, error) {
+ req, err := http.NewRequest(
+ "POST",
+ fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()),
+ bytes.NewBuffer(body))
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("content-type", contentType)
+ req.SetBasicAuth(user, pw)
+ return http.DefaultClient.Do(req)
+}
diff --git a/testdata/configroot/scopes/global/adapters.yml b/testdata/configroot/scopes/global/adapters.yml
index f4f2a68..cd4639f 100644
--- a/testdata/configroot/scopes/global/adapters.yml
+++ b/testdata/configroot/scopes/global/adapters.yml
@@ -18,8 +18,25 @@
kind: lists
impl: apigeeKeyChecker
params:
- organization: gregbrail
+ organization: YOUR_ORG_HERE
environment: prod
# Add verificationURL to use a different service or URL
- #verificationURL: https://gregbrail-prod.apigee.net/edgemicro-auth
-
+ #verificationURL: https://YOUR_ORG_HERE-prod.apigee.net/edgemicro-auth
+ - name: default
+ kind: attributes
+ impl: kubernetes
+ params:
+ # when running from mixer root, use the following config after adding a
+ # symbolic link to a kubernetes config file via:
+ #
+ # $ ln -s ~/.kube/config adapter/kubernetes/kubeconfig
+ #
+ kubeconfig_path: "adapter/kubernetes/kubeconfig"
+ - name: apigeeAnalytics
+ kind: access-logs
+ impl: apigeeReport
+ params:
+ organization: YOUR_ORG_HERE
+ environment: prod
+ key: KEY_FROM_EDGEMICRO_CONFIGURE
+ secret: SECRET_FROM_EDGEMICRO_CONFIGURE
diff --git a/testdata/configroot/scopes/global/descriptors.yml b/testdata/configroot/scopes/global/descriptors.yml
index 341513e..845533f 100644
--- a/testdata/configroot/scopes/global/descriptors.yml
+++ b/testdata/configroot/scopes/global/descriptors.yml
@@ -189,3 +189,15 @@
responseSize: 2 # INT64
referer: 1 # STRING
userAgent: 1 # STRING
+ - name: accesslog.apigee
+ payload_format: TEXT
+ log_template: 'UNUSED'
+ labels:
+ sourceIP: 6 # IP_ADDRESS
+ urlPath: 1
+ hostHeader: 1
+ httpMethod: 1
+ userAgent: 1
+ requestTime: 5
+ responseTime: 5
+ responseCode: 2
diff --git a/testdata/configroot/scopes/global/subjects/global/rules.yml b/testdata/configroot/scopes/global/subjects/global/rules.yml
index f38981a..61c71c4 100644
--- a/testdata/configroot/scopes/global/subjects/global/rules.yml
+++ b/testdata/configroot/scopes/global/subjects/global/rules.yml
@@ -13,47 +13,32 @@
- descriptorName: RequestCount
maxAmount: 5000
expiration: 1s
- - kind: metrics
- adapter: prometheus
- params:
- metrics:
- - descriptor_name: request_count
- # we want to increment this counter by 1 for each unique (source, target, service, method, response_code) tuple
- value: "1"
- labels:
- source: source.labels["app"] | "unknown"
- target: target.service | "unknown"
- service: target.labels["app"] | "unknown"
- method: request.path | "unknown"
- response_code: response.code | 200
- - descriptor_name: request_duration
- value: response.latency | response.duration | "0ms"
- labels:
- source: source.labels["app"] | "unknown"
- target: target.service | "unknown"
- service: target.labels["app"] | "unknown"
- method: request.path | "unknown"
- response_code: response.code | 200
- kind: access-logs
params:
- logName: access_log
+ logName: accesslog.default
log:
- descriptor_name: accesslog.common
- template_expressions:
- originIp: origin.ip
- sourceUser: origin.user
- timestamp: request.time
- method: request.method
- url: request.path
- protocol: request.scheme
- responseCode: response.code
- responseSize: response.size
+ descriptorName: accesslog.common
labels:
- originIp: origin.ip
- sourceUser: origin.user
- timestamp: request.time
- method: request.method
- url: request.path
- protocol: request.scheme
- responseCode: response.code
- responseSize: response.size
+ originIp: source.ip
+ sourceUser: source.uid
+ timestamp: request.time
+ method: request.method | "http"
+ url: request.path
+ protocol: request.scheme
+ responseCode: response.code
+ responseSize: response.size
+ - kind: access-logs
+ adapter: apigeeAnalytics
+ params:
+ logName: accesslog.apigee
+ log:
+ descriptorName: accesslog.apigee
+ labels:
+ sourceIP: source.ip
+ urlPath: request.path
+ hostHeader: request.host
+ httpMethod: request.method
+ userAgent: request.useragent
+ requestTime: request.time
+ responseTime: response.time
+ responseCode: response.code