| /* |
| Copyright 2017 Google Inc. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package adapter |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "net/url" |
| "time" |
| |
| "github.com/apid/istioApigeeAdapter/adapter/config" |
| "github.com/apid/istioApigeeAdapter/common" |
| "istio.io/mixer/pkg/adapter" |
| ) |
| |
| const ( |
| defaultCollectURL = "https://edgemicroservices.apigee.net/edgemicro" |
| ) |
| |
| type ( |
| analyticsBuilder struct { |
| adapter.DefaultBuilder |
| } |
| |
| analyticsLogger struct { |
| env adapter.Env |
| collectionURL string |
| key string |
| secret string |
| } |
| ) |
| |
| func newReportBuilder() analyticsBuilder { |
| return analyticsBuilder{ |
| adapter.NewDefaultBuilder( |
| "apigeeReport", |
| "Report logs to apigee", |
| &config.ReportParams{}, |
| )} |
| } |
| |
| func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) { |
| cfg := c.(*config.ReportParams) |
| if cfg.Organization == "" { |
| ce = ce.Appendf("organization", "Organization parameter must be specified") |
| } |
| if cfg.Environment == "" { |
| ce = ce.Appendf("environment", "Environment parameter must be specified") |
| } |
| if cfg.CollectionURL != "" { |
| _, err := url.Parse(cfg.CollectionURL) |
| if err != nil { |
| ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err) |
| } |
| } |
| if cfg.Key == "" { |
| ce = ce.Appendf("key", "Key must be specified") |
| } |
| if cfg.Secret == "" { |
| ce = ce.Appendf("secret", "Secret must be specified") |
| } |
| return |
| } |
| |
| func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) { |
| return newLogger(env, cfg) |
| } |
| |
| func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) { |
| return newLogger(env, cfg) |
| } |
| |
| func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) { |
| cfg := c.(*config.ReportParams) |
| var basePath string |
| |
| if cfg.CollectionURL == "" { |
| basePath = defaultCollectURL |
| } else { |
| basePath = cfg.CollectionURL |
| } |
| collectionURL := fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s", |
| basePath, cfg.Organization, cfg.Environment) |
| |
| al := &analyticsLogger{ |
| env: env, |
| collectionURL: collectionURL, |
| key: cfg.Key, |
| secret: cfg.Secret, |
| } |
| env.Logger().Infof("Created Apigee Report adapter to invoke \"%s\"", collectionURL) |
| return al, nil |
| } |
| |
| func (l *analyticsLogger) Log(entries []adapter.LogEntry) error { |
| return l.log(entries) |
| } |
| |
| func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error { |
| return l.log(entries) |
| } |
| |
| func (l *analyticsLogger) log(entries []adapter.LogEntry) error { |
| var records []common.AnalyticsRecord |
| |
| for _, entry := range entries { |
| sourceIP := getStringLabel(entry, "sourceIP") |
| path := getStringLabel(entry, "urlPath") |
| host := getStringLabel(entry, "hostHeader") |
| method := getStringLabel(entry, "httpMethod") |
| userAgent := getStringLabel(entry, "userAgent") |
| // Convert nanos (Go) to millis (Java) |
| requestTime := getTimestampLabel(entry, "requestTime") / 1000 |
| responseTime := getTimestampLabel(entry, "responseTime") / 1000 |
| responseCode := getIntLabel(entry, "responseCode") |
| |
| r := common.AnalyticsRecord{ |
| ClientReceivedStartTimestamp: requestTime, |
| ClientReceivedEndTimestamp: requestTime + 1, |
| ClientSentStartTimestamp: responseTime, |
| ClientSentEndTimestamp: responseTime + 1, |
| // Missing: Target times |
| ClientIP: sourceIP, |
| RequestVerb: method, |
| UserAgent: userAgent, |
| ResponseStatusCode: responseCode, |
| // Technically wrong because of no scheme and host header |
| RequestURI: "http://" + host + path, |
| // Should this include query params? Perhaps not. |
| RequestPath: path, |
| // Hard-coded, but should come from config |
| APIProxy: "istio", |
| APIProxyRevision: 1, |
| RecordType: "APIAnalytics", |
| } |
| |
| records = append(records, r) |
| } |
| |
| ax := common.AnalyticsRequest{ |
| Records: records, |
| } |
| err := l.pushRecords(&ax) |
| if err != nil { |
| l.env.Logger().Errorf("Error pushing analytics: %s", err) |
| } |
| return err |
| } |
| |
| func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error { |
| jsonBody, err := json.Marshal(ax) |
| if err != nil { |
| return err |
| } |
| |
| req, err := http.NewRequest("POST", l.collectionURL, |
| bytes.NewBuffer(jsonBody)) |
| if err != nil { |
| return err |
| } |
| |
| req.Header.Set("content-type", "application/json") |
| req.SetBasicAuth(l.key, l.secret) |
| resp, err := http.DefaultClient.Do(req) |
| if err != nil { |
| return err |
| } |
| |
| defer resp.Body.Close() |
| if resp.StatusCode != 200 { |
| return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode) |
| } |
| |
| dec := json.NewDecoder(resp.Body) |
| var axResponse common.AnalyticsResponse |
| err = dec.Decode(&axResponse) |
| if err != nil { |
| return err |
| } |
| if axResponse.Rejected > 0 { |
| return fmt.Errorf("%d out of %d analytics records rejected", |
| axResponse.Rejected, len(ax.Records)) |
| } |
| l.env.Logger().Infof("%d out of %d analytics records accepted", |
| axResponse.Accepted, len(ax.Records)) |
| return nil |
| } |
| |
| func getStringLabel(le adapter.LogEntry, key string) string { |
| v := le.Labels[key] |
| if v == nil { |
| return "" |
| } |
| switch v.(type) { |
| case string: |
| return v.(string) |
| default: |
| return fmt.Sprintf("%v", v) |
| } |
| } |
| |
| func getIntLabel(le adapter.LogEntry, key string) int { |
| v := le.Labels[key] |
| if v == nil { |
| return 0 |
| } |
| switch v.(type) { |
| case int: |
| return v.(int) |
| case int64: |
| return int(v.(int64)) |
| default: |
| return 0 |
| } |
| } |
| |
| func getTimestampLabel(le adapter.LogEntry, key string) int64 { |
| v := le.Labels[key] |
| if v == nil { |
| return 0 |
| } |
| switch v.(type) { |
| case time.Time: |
| return (v.(time.Time)).UnixNano() |
| case int64: |
| return v.(int64) |
| default: |
| return 0 |
| } |
| } |
| |
| func (l *analyticsLogger) Close() error { return nil } |