Add report code with a mock server and tests. Add adapter config to test config.
diff --git a/README.md b/README.md index 851eb64..13dd03c 100644 --- a/README.md +++ b/README.md
@@ -27,7 +27,7 @@ Now, launch the Mixer binary: - $MIXER_HOME/bazel-bin/cmd/server/mixs --logtostderr --configStoreURL fs://$THIS_ROOT/testdata/configroot + $MIXER_HOME/bazel-bin/cmd/server/mixs server --logtostderr --configStoreURL fs://$THIS_ROOT/testdata/configroot (In other words the "config store URL" needs to be the absolute path of the "testdata/configroot" directory of this repo. If you get this wrong then the mixer will silently do nothing @@ -35,7 +35,7 @@ Once that's running, here's a sample command: - $MIXER_HOME/bazel-bin/cmd/client/mixc check -a target.serivce=fault.svc.cluster.local \ + $MIXER_HOME/bazel-bin/cmd/client/mixc check -a target.service=fault.svc.cluster.local \ --stringmap_attributes request.headers=apikey:SOME_VALID_API_KEY That will send a "check" RPC to the mixer, which will respond "OK" if and only if the
diff --git a/adapter/BUILD b/adapter/BUILD index 9e57dc2..66aabee 100644 --- a/adapter/BUILD +++ b/adapter/BUILD
@@ -20,7 +20,10 @@ go_test( name = "tests", size = "medium", - srcs = ["apigeeKeyChecker_test.go"], + srcs = [ + "apigeeKeyChecker_test.go", + "apigeeReport_test.go", + ], library = ":go_default_library", deps = [ "//mock:go_default_library",
diff --git a/adapter/apigee.go b/adapter/apigee.go index c11818e..6fe3853 100644 --- a/adapter/apigee.go +++ b/adapter/apigee.go
@@ -25,5 +25,6 @@ // aspect of the Apigee functionality. func Register(r adapter.Registrar) { r.RegisterListsBuilder(newKeyCheckBuilder()) + //r.RegisterApplicationLogsBuilder(newReportBuilder()) r.RegisterAccessLogsBuilder(newReportBuilder()) }
diff --git a/adapter/apigeeKeyChecker.go b/adapter/apigeeKeyChecker.go index 6785f6f..38e3dff 100644 --- a/adapter/apigeeKeyChecker.go +++ b/adapter/apigeeKeyChecker.go
@@ -29,9 +29,10 @@ ) const ( - checkName = "apigeeKeyChecker" - checkDesc = "Verify an API key from a parameter" - verifyKeyPath = "/verifyApiKey" + checkName = "apigeeKeyChecker" + checkDesc = "Verify an API key from a parameter" + verifyKeyPath = "/verifyApiKey" + defaultVerifyURL = "https://%s-%s.apigee.net/edgemicro-auth" ) var checkConf = &config.VerifyKeyParams{} @@ -73,8 +74,7 @@ var basePath string if cfg.VerificationURL == "" { - basePath = fmt.Sprintf("https://%s-%s.apigee.net/edgemicro-auth", - cfg.Organization, cfg.Environment) + basePath = fmt.Sprintf(defaultVerifyURL, cfg.Organization, cfg.Environment) } else { basePath = cfg.VerificationURL }
diff --git a/adapter/apigeeReport.go b/adapter/apigeeReport.go index 2500b57..ae4f297 100644 --- a/adapter/apigeeReport.go +++ b/adapter/apigeeReport.go
@@ -20,128 +20,229 @@ "bytes" "encoding/json" "fmt" - "io" "net/http" - "os" + "net/url" + "time" "github.com/apid/istioApigeeAdapter/adapter/config" - me "github.com/hashicorp/go-multierror" + "github.com/apid/istioApigeeAdapter/common" "istio.io/mixer/pkg/adapter" ) +const ( + defaultCollectURL = "https://edgemicroservices.apigee.net/edgemicro" +) + type ( - builder struct{ adapter.DefaultBuilder } - - logger struct{ logStream io.Writer } - - analyticsRecord struct { - ClientReceivedStartTimestamp int `json:"client_received_start_timestamp"` - ClientReceivedEndTimestamp int `json:"client_received_end_timestamp"` - RecordType string `json:"recordType"` - ApiProxy string `json:"apiproxy"` - RequestUri string `json:"request_uri"` - RequestPath string `json:"request_path"` - RequestVerb string `json:"request_verb"` - ClientIp string `json:"client_ip"` - UserAgent string `json:"useragent"` - ApiProxyRevision string `json:"apiproxy_revision"` - ResponseStatusCode int `json:"response_status_code"` - ClientSentStartTimestamp int `json:"client_sent_start_timestamp"` - ClientSentEndTimestamp int `json:"client_sent_end_timestamp"` - DeveloperEmail string `json:"developer_email,omitempty"` - DeveloperApp string `json:"developer_app"` - AccessToken string `json:"access_token,omitempty"` - ClientId string `json:"client_id,omitempty"` - ApiProduct string `json:"api_product"` + analyticsBuilder struct { + adapter.DefaultBuilder } - analyticsRecordCollection struct { - Records []analyticsRecord `json:"records"` - } - - edgemicroKeys struct { - Key string - Secret string + analyticsLogger struct { + env adapter.Env + collectionURL string + key string + secret string } ) -func newReportBuilder() builder { - return builder{adapter.NewDefaultBuilder( - "apigeeReport", - "Report logs to apigee", - &config.ReportParams{}, - )} +func newReportBuilder() analyticsBuilder { + return analyticsBuilder{ + adapter.NewDefaultBuilder( + "apigeeReport", + "Report logs to apigee", + &config.ReportParams{}, + )} } -func (builder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) { - return newLogger(cfg) -} - -func (builder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) { - return newLogger(cfg) -} - -func newLogger(cfg adapter.Config) (*logger, error) { - c := cfg.(*config.ReportParams) - - w := os.Stderr - if c.LogStream == config.STDOUT { - w = os.Stdout +func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) { + cfg := c.(*config.ReportParams) + if cfg.Organization == "" { + ce = ce.Appendf("organization", "Organization parameter must be specified") } - - return &logger{w}, nil -} - -func (l *logger) Log(entries []adapter.LogEntry) error { - return l.log(entries) -} - -func (l *logger) LogAccess(entries []adapter.LogEntry) error { - fmt.Println("*** here") - return l.log(entries) -} - -func (l *logger) log(entries []adapter.LogEntry) error { - var recordsCollection []analyticsRecord - - var errors *me.Error - for _, entry := range entries { - - if err := writeJSON(l.logStream, entry); err != nil { - errors = me.Append(errors, err) - } - - ax_data := analyticsRecord{ - ClientReceivedStartTimestamp: 33, - DeveloperApp: "microgateway-demo", - } - - recordsCollection = append(recordsCollection, ax_data) - + if cfg.Environment == "" { + ce = ce.Appendf("environment", "Environment parameter must be specified") } - - res2B, _ := json.Marshal(recordsCollection) - fmt.Println(string(res2B)) - - return errors.ErrorOrNil() + if cfg.CollectionURL != "" { + _, err := url.Parse(cfg.CollectionURL) + if err != nil { + ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err) + } + } + if cfg.Key == "" { + ce = ce.Appendf("key", "Key must be specified") + } + if cfg.Secret == "" { + ce = ce.Appendf("secret", "Secret must be specified") + } + return } -func (l *logger) Close() error { return nil } - -func writeJSON(w io.Writer, le interface{}) error { - return json.NewEncoder(w).Encode(le) +func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) { + return newLogger(env, cfg) } -func sendAnalyticsRecords(collection analyticsRecordCollection, keys edgemicroKeys, uri string) bool { - serializedBody, _ := json.Marshal(&collection) - req, _ := http.NewRequest("POST", uri, bytes.NewBuffer(serializedBody)) - req.SetBasicAuth(keys.Key, keys.Secret) - req.Header.Add("content-type", "application/json") - client := &http.Client{} - resp, _ := client.Do(req) - if resp.StatusCode != 200 { - return false +func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) { + return newLogger(env, cfg) +} + +func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) { + cfg := c.(*config.ReportParams) + var basePath string + + if cfg.CollectionURL == "" { + basePath = defaultCollectURL } else { - return true + basePath = cfg.CollectionURL + } + collectionURL := fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s", + basePath, cfg.Organization, cfg.Environment) + + al := &analyticsLogger{ + env: env, + collectionURL: collectionURL, + key: cfg.Key, + secret: cfg.Secret, + } + env.Logger().Infof("Created Apigee Report adapter to invoke \"%s\"", collectionURL) + return al, nil +} + +func (l *analyticsLogger) Log(entries []adapter.LogEntry) error { + return l.log(entries) +} + +func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error { + return l.log(entries) +} + +func (l *analyticsLogger) log(entries []adapter.LogEntry) error { + var records []common.AnalyticsRecord + + for _, entry := range entries { + sourceIP := getStringLabel(entry, "sourceIP") + path := getStringLabel(entry, "urlPath") + host := getStringLabel(entry, "hostHeader") + method := getStringLabel(entry, "httpMethod") + userAgent := getStringLabel(entry, "userAgent") + // Convert nanos (Go) to millis (Java) + requestTime := getTimestampLabel(entry, "requestTime") / 1000 + responseTime := getTimestampLabel(entry, "responseTime") / 1000 + responseCode := getIntLabel(entry, "responseCode") + + r := common.AnalyticsRecord{ + ClientReceivedStartTimestamp: requestTime, + ClientReceivedEndTimestamp: requestTime + 1, + ClientSentStartTimestamp: responseTime, + ClientSentEndTimestamp: responseTime + 1, + // Missing: Target times + ClientIP: sourceIP, + RequestVerb: method, + UserAgent: userAgent, + ResponseStatusCode: responseCode, + // Technically wrong because of no scheme and host header + RequestURI: "http://" + host + path, + // Should this include query params? Perhaps not. + RequestPath: path, + // Hard-coded, but should come from config + APIProxy: "istio", + APIProxyRevision: 1, + RecordType: "APIAnalytics", + } + + records = append(records, r) + } + + ax := common.AnalyticsRequest{ + Records: records, + } + err := l.pushRecords(&ax) + if err != nil { + l.env.Logger().Errorf("Error pushing analytics: %s", err) + } + return err +} + +func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error { + jsonBody, err := json.Marshal(ax) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", l.collectionURL, + bytes.NewBuffer(jsonBody)) + if err != nil { + return err + } + + req.Header.Set("content-type", "application/json") + req.SetBasicAuth(l.key, l.secret) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode) + } + + dec := json.NewDecoder(resp.Body) + var axResponse common.AnalyticsResponse + err = dec.Decode(&axResponse) + if err != nil { + return err + } + if axResponse.Rejected > 0 { + return fmt.Errorf("%d out of %d analytics records rejected", + axResponse.Rejected, len(ax.Records)) + } + l.env.Logger().Infof("%d out of %d analytics records accepted", + axResponse.Accepted, len(ax.Records)) + return nil +} + +func getStringLabel(le adapter.LogEntry, key string) string { + v := le.Labels[key] + if v == nil { + return "" + } + switch v.(type) { + case string: + return v.(string) + default: + return fmt.Sprintf("%v", v) } } + +func getIntLabel(le adapter.LogEntry, key string) int { + v := le.Labels[key] + if v == nil { + return 0 + } + switch v.(type) { + case int: + return v.(int) + case int64: + return int(v.(int64)) + default: + return 0 + } +} + +func getTimestampLabel(le adapter.LogEntry, key string) int64 { + v := le.Labels[key] + if v == nil { + return 0 + } + switch v.(type) { + case time.Time: + return (v.(time.Time)).UnixNano() + case int64: + return v.(int64) + default: + return 0 + } +} + +func (l *analyticsLogger) Close() error { return nil }
diff --git a/adapter/apigeeReport_test.go b/adapter/apigeeReport_test.go new file mode 100644 index 0000000..2f279d3 --- /dev/null +++ b/adapter/apigeeReport_test.go
@@ -0,0 +1,114 @@ +/* +Copyright 2017 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package adapter + +import ( + "testing" + "time" + + "github.com/apid/istioApigeeAdapter/adapter/config" + "github.com/apid/istioApigeeAdapter/mock" + "istio.io/mixer/pkg/adapter" +) + +func TestAxMissingOrg(t *testing.T) { + cfg := config.ReportParams{ + CollectionURL: "http://" + mockServer.Address(), + Key: mock.ValidPublishKey, + Secret: mock.ValidPublishSecret, + } + + rp := newReportBuilder() + ce := rp.ValidateConfig(&cfg) + if ce == nil { + t.Fatalf("Expected invalid config") + } +} + +func TestAxWrongSecret(t *testing.T) { + cfg := config.ReportParams{ + Organization: "foo", + Environment: "test", + CollectionURL: "http://" + mockServer.Address(), + Key: mock.ValidPublishKey, + Secret: "NOTTHISONE", + } + + rp := newReportBuilder() + ce := rp.ValidateConfig(&cfg) + if ce != nil { + t.Fatalf("Expected valid config and got %s", ce) + } + + aspect, err := rp.NewAccessLogsAspect(mockEnv, &cfg) + if err != nil { + t.Fatalf("Error creating aspect: %s", err) + } + + err = aspect.LogAccess( + []adapter.LogEntry{ + { + LogName: "test", + Timestamp: time.Now().String(), + Severity: adapter.Info, + Labels: map[string]interface{}{ + "sourceIP": "1.2.3.4", + }, + }, + }, + ) + if err == nil { + t.Fatalf("Expected push error and got none") + } +} + +func TestAxValidConfig(t *testing.T) { + cfg := config.ReportParams{ + Organization: "foo", + Environment: "test", + CollectionURL: "http://" + mockServer.Address(), + Key: mock.ValidPublishKey, + Secret: mock.ValidPublishSecret, + } + + rp := newReportBuilder() + ce := rp.ValidateConfig(&cfg) + if ce != nil { + t.Fatalf("Expected valid config and got %s", ce) + } + + aspect, err := rp.NewAccessLogsAspect(mockEnv, &cfg) + if err != nil { + t.Fatalf("Error creating aspect: %s", err) + } + + err = aspect.LogAccess( + []adapter.LogEntry{ + { + LogName: "test", + Timestamp: time.Now().String(), + Severity: adapter.Info, + Labels: map[string]interface{}{ + "sourceIP": "1.2.3.4", + }, + }, + }, + ) + if err != nil { + t.Fatalf("Error pushing analytics: %s", err) + } +}
diff --git a/adapter/config/config.proto b/adapter/config/config.proto index 4d85bc7..b069fbc 100644 --- a/adapter/config/config.proto +++ b/adapter/config/config.proto
@@ -30,24 +30,16 @@ } message ReportParams { - - // Stream is used to select between different logs output sinks. - enum Stream { - // STDERR refers to os.Stderr. - STDERR = 0; - // STDOUT refers to os.Stdout. - STDOUT = 1; - } - - // Selects which standard stream to write to for log entries. - // STDERR is the default Stream. - Stream log_stream = 1; - - string organization = 2; - - string environment = 3; - - string key = 4; - - string secret = 5; + // The name of the Apigee organization -- required + string organization = 1; + // The name of the Apigee environment -- required + string environment = 2; + // A URL to use to contact the collection service. Will be + // constructed from the organization and environment name if not + // specified, and assumes that the service runs at production "apigee.net". + string collectionURL = 3; + // The key required to collect analytics, from "edgemicro configure" + string key = 4; + // The secret required to collect analytics, from "edgemicro configure" + string secret = 5; }
diff --git a/common/types.go b/common/types.go index ba362ba..7d81c1c 100644 --- a/common/types.go +++ b/common/types.go
@@ -29,6 +29,40 @@ Token string `json:"token"` } +type AnalyticsRecord struct { + ClientReceivedStartTimestamp int64 `json:"client_received_start_timestamp"` + ClientReceivedEndTimestamp int64 `json:"client_received_end_timestamp"` + ClientSentStartTimestamp int64 `json:"client_sent_start_timestamp"` + ClientSentEndTimestamp int64 `json:"client_sent_end_timestamp"` + TargetReceivedStartTimestamp int64 `json:"target_received_start_timestamp,omitempty"` + TargetReceivedEndTimestamp int64 `json:"target_received_end_timestamp,omitempty"` + TargetSentStartTimestamp int64 `json:"target_sent_start_timestamp,omitempty"` + TargetSentEndTimestamp int64 `json:"target_sent_end_timestamp,omitempty"` + RecordType string `json:"recordType"` + APIProxy string `json:"apiproxy"` + RequestURI string `json:"request_uri"` + RequestPath string `json:"request_path"` + RequestVerb string `json:"request_verb"` + ClientIP string `json:"client_ip"` + UserAgent string `json:"useragent"` + APIProxyRevision int `json:"apiproxy_revision"` + ResponseStatusCode int `json:"response_status_code"` + DeveloperEmail string `json:"developer_email,omitempty"` + DeveloperApp string `json:"developer_app,omitempty"` + AccessToken string `json:"access_token,omitempty"` + ClientID string `json:"client_id,omitempty"` + APIProduct string `json:"api_product"` +} + +type AnalyticsRequest struct { + Records []AnalyticsRecord `json:"records"` +} + +type AnalyticsResponse struct { + Accepted int `json:"accepted"` + Rejected int `json:"rejected"` +} + type APIProduct struct { APIResources []string `json:"apiResources"` ApprovalType string `json:"approvalType"`
diff --git a/mock/mockserver.go b/mock/mockserver.go index 8ab7737..608acf1 100644 --- a/mock/mockserver.go +++ b/mock/mockserver.go
@@ -34,11 +34,14 @@ var mockInit = &sync.Once{} const ( - ValidAPIKey1 = "12345" + ValidAPIKey1 = "12345" + ValidPublishKey = "aaaaaa" + ValidPublishSecret = "bbbbbb" ) type MockServer struct { - listener net.Listener + listener net.Listener + analyticsRecords []common.AnalyticsRecord } func StartMockServer(port int) (*MockServer, error) { @@ -57,6 +60,7 @@ router.GET("/publicKey", ms.getPublicKey) router.GET("/products", ms.getProducts) router.POST("/verifyApiKey", ms.getAPIKey) + router.POST("/axpublisher/organization/:org/environment/:env", ms.publishAnalytics) go func() { http.Serve(l, router) @@ -73,6 +77,10 @@ m.listener.Close() } +func (m *MockServer) GetAnalyticsRecords() []common.AnalyticsRecord { + return m.analyticsRecords +} + func (m *MockServer) getPublicKey(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { w.Header().Set("content-type", "text/plain") w.Write(mockCertPEM) @@ -153,6 +161,43 @@ enc.Encode(&tok) } +func (m *MockServer) publishAnalytics(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + user, pw, aok := r.BasicAuth() + if !aok || user != ValidPublishKey || pw != ValidPublishSecret { + w.WriteHeader(401) + return + } + if r.Header.Get("content-type") != "application/json" { + sendAnalyticsResponse(w, 0, 1) + return + } + + var req common.AnalyticsRequest + defer r.Body.Close() + dec := json.NewDecoder(r.Body) + err := dec.Decode(&req) + if err != nil { + // This is what the existing service does + sendFault(w, 500, "Failed to execute JavaCallout. not a JSON Object", + "steps.javacallout.ExecutionError") + return + } + + // For now we just accept it all. We'll add some validation in the future. + m.analyticsRecords = append(m.analyticsRecords, req.Records...) + sendAnalyticsResponse(w, len(req.Records), 0) +} + +func sendAnalyticsResponse(w http.ResponseWriter, accepted, rejected int) { + resp := &common.AnalyticsResponse{ + Accepted: accepted, + Rejected: rejected, + } + w.Header().Set("content-type", "application/json") + enc := json.NewEncoder(w) + enc.Encode(resp) +} + func sendFault(w http.ResponseWriter, errorCode int, fault, code string) { w.Header().Set("content-type", "application/json") w.WriteHeader(errorCode)
diff --git a/mock/mockserver_test.go b/mock/mockserver_test.go index 862383a..bebf2b5 100644 --- a/mock/mockserver_test.go +++ b/mock/mockserver_test.go
@@ -26,6 +26,7 @@ "net/http" "os" "testing" + "time" "github.com/SermoDigital/jose/jws" "github.com/apid/istioApigeeAdapter/common" @@ -199,3 +200,167 @@ t.Fatalf("Error parsing JWT: %s", err) } } + +func TestAxPublishBadJSON(t *testing.T) { + resp, err := sendAnalytics([]byte("NOTJSON"), "application/json", + ValidPublishKey, ValidPublishSecret) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 500 { + t.Fatalf("Got status code %d and expected 500", resp.StatusCode) + } +} + +func TestAxPublishNotJSON(t *testing.T) { + resp, err := sendAnalytics([]byte("{}"), "text/plain", + ValidPublishKey, ValidPublishSecret) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("Got status code %d and expected 200", resp.StatusCode) + } + + var respBody common.AnalyticsResponse + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&respBody) + if err != nil { + t.Fatalf("Error reading JSON response: %s", err) + } + + if respBody.Accepted != 0 { + t.Fatalf("%d records were accepted", respBody.Accepted) + } + if respBody.Rejected != 1 { + t.Fatalf("%d records were rejected", respBody.Rejected) + } +} + +func TestAxPublishEmpty(t *testing.T) { + resp, err := sendAnalytics([]byte("{}"), "application/json", + ValidPublishKey, ValidPublishSecret) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("Got status code %d and expected 200", resp.StatusCode) + } + + var respBody common.AnalyticsResponse + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&respBody) + if err != nil { + t.Fatalf("Error reading JSON response: %s", err) + } + + if respBody.Accepted != 0 { + t.Fatalf("%d records were accepted", respBody.Accepted) + } + if respBody.Rejected != 0 { + t.Fatalf("%d records were rejected", respBody.Rejected) + } +} + +func TestAxPublishWrongUser(t *testing.T) { + resp, err := sendAnalytics([]byte("{}"), "application/json", + "NOTVALID", ValidPublishSecret) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 401 { + t.Fatalf("Got status code %d and expected 401", resp.StatusCode) + } +} + +func TestAxPublishWrongPW(t *testing.T) { + resp, err := sendAnalytics([]byte("{}"), "application/json", + ValidPublishKey, "NOPE") + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 401 { + t.Fatalf("Got status code %d and expected 401", resp.StatusCode) + } +} + +func TestAxPublishNoAuth(t *testing.T) { + resp, err := http.Post( + fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()), + "application/json", bytes.NewReader([]byte("{}"))) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 401 { + t.Fatalf("Got status code %d and expected 401", resp.StatusCode) + } +} + +func TestAxPublishSuccess(t *testing.T) { + now := time.Now().UnixNano() / 1000 + req := common.AnalyticsRequest{ + Records: []common.AnalyticsRecord{ + { + ClientReceivedStartTimestamp: now, + ClientReceivedEndTimestamp: now + 1, + ClientSentStartTimestamp: now + 10, + ClientSentEndTimestamp: now + 11, + RecordType: "APIAnalytics", + APIProxy: "helloworld", + RequestURI: "http://hello/world", + RequestPath: "/world", + RequestVerb: "GET", + ClientIP: "192.168.201.100", + UserAgent: "Testing", + APIProxyRevision: 1, + ResponseStatusCode: 200, + }, + }, + } + requestBod, err := json.Marshal(&req) + if err != nil { + t.Fatalf("Error making JSON: %s", err) + } + resp, err := sendAnalytics(requestBod, "application/json", + ValidPublishKey, ValidPublishSecret) + if err != nil { + t.Fatalf("Network error sending request: %s", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Fatalf("Got status code %d", resp.StatusCode) + } + + var respBody common.AnalyticsResponse + dec := json.NewDecoder(resp.Body) + err = dec.Decode(&respBody) + if err != nil { + t.Fatalf("Error reading JSON response: %s", err) + } + + if respBody.Accepted != 1 { + t.Fatalf("Only %d record accepted", respBody.Accepted) + } + if respBody.Rejected != 0 { + t.Fatalf("%d records were rejected", respBody.Rejected) + } +} + +func sendAnalytics(body []byte, contentType, user, pw string) (*http.Response, error) { + req, err := http.NewRequest( + "POST", + fmt.Sprintf("http://%s/axpublisher/organization/foo/environment/test", testMockServer.Address()), + bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + req.Header.Set("content-type", contentType) + req.SetBasicAuth(user, pw) + return http.DefaultClient.Do(req) +}
diff --git a/testdata/configroot/scopes/global/adapters.yml b/testdata/configroot/scopes/global/adapters.yml index f4f2a68..cd4639f 100644 --- a/testdata/configroot/scopes/global/adapters.yml +++ b/testdata/configroot/scopes/global/adapters.yml
@@ -18,8 +18,25 @@ kind: lists impl: apigeeKeyChecker params: - organization: gregbrail + organization: YOUR_ORG_HERE environment: prod # Add verificationURL to use a different service or URL - #verificationURL: https://gregbrail-prod.apigee.net/edgemicro-auth - + #verificationURL: https://YOUR_ORG_HERE-prod.apigee.net/edgemicro-auth + - name: default + kind: attributes + impl: kubernetes + params: + # when running from mixer root, use the following config after adding a + # symbolic link to a kubernetes config file via: + # + # $ ln -s ~/.kube/config adapter/kubernetes/kubeconfig + # + kubeconfig_path: "adapter/kubernetes/kubeconfig" + - name: apigeeAnalytics + kind: access-logs + impl: apigeeReport + params: + organization: YOUR_ORG_HERE + environment: prod + key: KEY_FROM_EDGEMICRO_CONFIGURE + secret: SECRET_FROM_EDGEMICRO_CONFIGURE
diff --git a/testdata/configroot/scopes/global/descriptors.yml b/testdata/configroot/scopes/global/descriptors.yml index 341513e..845533f 100644 --- a/testdata/configroot/scopes/global/descriptors.yml +++ b/testdata/configroot/scopes/global/descriptors.yml
@@ -189,3 +189,15 @@ responseSize: 2 # INT64 referer: 1 # STRING userAgent: 1 # STRING + - name: accesslog.apigee + payload_format: TEXT + log_template: 'UNUSED' + labels: + sourceIP: 6 # IP_ADDRESS + urlPath: 1 + hostHeader: 1 + httpMethod: 1 + userAgent: 1 + requestTime: 5 + responseTime: 5 + responseCode: 2
diff --git a/testdata/configroot/scopes/global/subjects/global/rules.yml b/testdata/configroot/scopes/global/subjects/global/rules.yml index f38981a..61c71c4 100644 --- a/testdata/configroot/scopes/global/subjects/global/rules.yml +++ b/testdata/configroot/scopes/global/subjects/global/rules.yml
@@ -13,47 +13,32 @@ - descriptorName: RequestCount maxAmount: 5000 expiration: 1s - - kind: metrics - adapter: prometheus - params: - metrics: - - descriptor_name: request_count - # we want to increment this counter by 1 for each unique (source, target, service, method, response_code) tuple - value: "1" - labels: - source: source.labels["app"] | "unknown" - target: target.service | "unknown" - service: target.labels["app"] | "unknown" - method: request.path | "unknown" - response_code: response.code | 200 - - descriptor_name: request_duration - value: response.latency | response.duration | "0ms" - labels: - source: source.labels["app"] | "unknown" - target: target.service | "unknown" - service: target.labels["app"] | "unknown" - method: request.path | "unknown" - response_code: response.code | 200 - kind: access-logs params: - logName: access_log + logName: accesslog.default log: - descriptor_name: accesslog.common - template_expressions: - originIp: origin.ip - sourceUser: origin.user - timestamp: request.time - method: request.method - url: request.path - protocol: request.scheme - responseCode: response.code - responseSize: response.size + descriptorName: accesslog.common labels: - originIp: origin.ip - sourceUser: origin.user - timestamp: request.time - method: request.method - url: request.path - protocol: request.scheme - responseCode: response.code - responseSize: response.size + originIp: source.ip + sourceUser: source.uid + timestamp: request.time + method: request.method | "http" + url: request.path + protocol: request.scheme + responseCode: response.code + responseSize: response.size + - kind: access-logs + adapter: apigeeAnalytics + params: + logName: accesslog.apigee + log: + descriptorName: accesslog.apigee + labels: + sourceIP: source.ip + urlPath: request.path + hostHeader: request.host + httpMethod: request.method + userAgent: request.useragent + requestTime: request.time + responseTime: response.time + responseCode: response.code