| /* |
| Copyright 2017 Google Inc. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package adapter |
| |
| import ( |
| "bytes" |
| "compress/gzip" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "net/http" |
| "net/url" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/apid/istioApigeeAdapter/adapter/config" |
| "github.com/apid/istioApigeeAdapter/common" |
| "istio.io/mixer/pkg/adapter" |
| ) |
| |
| const ( |
| regionLookupURL = "https://edgemicroservices.apigee.net/edgemicro" |
| nanosPerMillisecond = 1000000 |
| reportName = "apigeeReport" |
| reportDesc = "Report API call data to Apigee" |
| ) |
| |
| type ( |
| analyticsBuilder struct { |
| adapter.DefaultBuilder |
| } |
| |
| analyticsLogger struct { |
| env adapter.Env |
| collectionURL string |
| organization string |
| environment string |
| actualURL string |
| key string |
| secret string |
| latch *sync.Mutex |
| } |
| ) |
| |
| func newReportBuilder() analyticsBuilder { |
| return analyticsBuilder{ |
| adapter.NewDefaultBuilder( |
| reportName, |
| reportDesc, |
| &config.ReportParams{}, |
| )} |
| } |
| |
| func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) { |
| cfg := c.(*config.ReportParams) |
| if cfg.Organization == "" { |
| ce = ce.Appendf("organization", "Organization parameter must be specified") |
| } |
| if cfg.Environment == "" { |
| ce = ce.Appendf("environment", "Environment parameter must be specified") |
| } |
| if cfg.CollectionURL != "" { |
| _, err := url.Parse(cfg.CollectionURL) |
| if err != nil { |
| ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err) |
| } |
| } |
| if cfg.Key == "" { |
| ce = ce.Appendf("key", "Key must be specified") |
| } |
| if cfg.Secret == "" { |
| ce = ce.Appendf("secret", "Secret must be specified") |
| } |
| return |
| } |
| |
| func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) { |
| return newLogger(env, cfg) |
| } |
| |
| func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) { |
| return newLogger(env, cfg) |
| } |
| |
| func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) { |
| cfg := c.(*config.ReportParams) |
| al := &analyticsLogger{ |
| env: env, |
| organization: cfg.Organization, |
| environment: cfg.Environment, |
| collectionURL: cfg.CollectionURL, |
| key: cfg.Key, |
| secret: cfg.Secret, |
| latch: &sync.Mutex{}, |
| } |
| return al, nil |
| } |
| |
| func (l *analyticsLogger) Log(entries []adapter.LogEntry) error { |
| return l.log(entries) |
| } |
| |
| func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error { |
| return l.log(entries) |
| } |
| |
| func (l *analyticsLogger) log(entries []adapter.LogEntry) error { |
| var records []common.AnalyticsRecord |
| |
| for _, entry := range entries { |
| path := getStringLabel(entry, "urlPath") |
| host := getStringLabel(entry, "hostHeader") |
| // Convert nanos (Go) to millis (Java) |
| requestTime := getTimestampLabel(entry, "requestTime") / nanosPerMillisecond |
| responseTime := getTimestampLabel(entry, "responseTime") / nanosPerMillisecond |
| |
| r := common.AnalyticsRecord{ |
| ClientReceivedStartTimestamp: requestTime, |
| // historically we want to make sure end is after start due to various AX stuff |
| ClientReceivedEndTimestamp: requestTime + 1, |
| ClientSentStartTimestamp: responseTime, |
| ClientSentEndTimestamp: responseTime + 1, |
| // Missing: Target times |
| ClientIP: getStringLabel(entry, "sourceIP"), |
| RequestVerb: getStringLabel(entry, "httpMethod"), |
| UserAgent: getStringLabel(entry, "userAgent"), |
| ResponseStatusCode: getIntLabel(entry, "responseCode"), |
| // Technically wrong because of no scheme and host header |
| RequestURI: "http://" + host + path, |
| RequestPath: strings.Split(path, "?")[0], |
| APIProxy: getStringLabel(entry, "proxyName"), |
| APIProxyRevision: getIntLabel(entry, "proxyRevision"), |
| RecordType: "APIAnalytics", |
| DeveloperApp: getStringLabel(entry, "applicationName"), |
| ClientID: getStringLabel(entry, "clientID"), |
| } |
| |
| records = append(records, r) |
| } |
| |
| ax := common.AnalyticsRequest{ |
| Records: records, |
| } |
| err := l.pushRecords(&ax) |
| if err != nil { |
| l.env.Logger().Errorf("Error pushing analytics: %s", err) |
| } |
| return err |
| } |
| |
| func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error { |
| pushURL, err := l.getPushURL() |
| if err != nil { |
| return err |
| } |
| |
| bb := &bytes.Buffer{} |
| zw := gzip.NewWriter(bb) |
| jw := json.NewEncoder(zw) |
| err = jw.Encode(ax) |
| if err != nil { |
| return err |
| } |
| zw.Close() |
| |
| req, err := http.NewRequest("POST", pushURL, bb) |
| if err != nil { |
| return err |
| } |
| |
| req.Header.Set("content-type", "application/json") |
| req.Header.Set("content-encoding", "gzip") |
| req.SetBasicAuth(l.key, l.secret) |
| resp, err := http.DefaultClient.Do(req) |
| if err != nil { |
| return err |
| } |
| |
| defer resp.Body.Close() |
| if resp.StatusCode != 200 { |
| return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode) |
| } |
| |
| dec := json.NewDecoder(resp.Body) |
| var axResponse common.AnalyticsResponse |
| err = dec.Decode(&axResponse) |
| if err != nil { |
| return err |
| } |
| if axResponse.Rejected > 0 { |
| return fmt.Errorf("%d out of %d analytics records rejected", |
| axResponse.Rejected, len(ax.Records)) |
| } |
| l.env.Logger().Infof("%d out of %d analytics records accepted", |
| axResponse.Accepted, len(ax.Records)) |
| return nil |
| } |
| |
| func (l *analyticsLogger) getPushURL() (string, error) { |
| l.latch.Lock() |
| defer l.latch.Unlock() |
| |
| if l.actualURL != "" { |
| return l.actualURL, nil |
| } |
| |
| if l.collectionURL != "" { |
| l.actualURL = fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s", |
| l.collectionURL, l.organization, l.environment) |
| l.env.Logger().Infof("Sending analytics data to %s", l.actualURL) |
| return l.actualURL, nil |
| } |
| |
| // Default config for Apigee cloud customers -- call web service to get correct URL |
| lookupURL := fmt.Sprintf("%s/region/organization/%s/environment/%s", |
| regionLookupURL, l.organization, l.environment) |
| req, err := http.NewRequest("GET", lookupURL, nil) |
| if err != nil { |
| return "", err |
| } |
| req.SetBasicAuth(l.key, l.secret) |
| resp, err := http.DefaultClient.Do(req) |
| if err != nil { |
| l.env.Logger().Errorf("Error %s getting region for analytics data from %s", |
| err, lookupURL) |
| return "", err |
| } |
| defer resp.Body.Close() |
| if resp.StatusCode != 200 { |
| msg := fmt.Sprintf("Error %d getting region for analytics data from %s", |
| resp.StatusCode, lookupURL) |
| l.env.Logger().Errorf(msg) |
| return "", errors.New(msg) |
| } |
| |
| var region common.RegionResponse |
| jr := json.NewDecoder(resp.Body) |
| err = jr.Decode(®ion) |
| if err != nil { |
| l.env.Logger().Errorf("Error parsing JSON from region service: %s", err) |
| return "", err |
| } |
| |
| l.actualURL = fmt.Sprintf("https://%s/edgemicro/axpublisher/organization/%s/environment/%s", |
| region.Host, l.organization, l.environment) |
| l.env.Logger().Infof("Using region \"%s\". Publishing to %s", |
| region.Region, l.actualURL) |
| return l.actualURL, nil |
| } |
| |
| func getStringLabel(le adapter.LogEntry, key string) string { |
| return getString(le.Labels, key) |
| } |
| |
| func getString(m map[string]interface{}, key string) string { |
| v := m[key] |
| if v == nil { |
| return "" |
| } |
| switch v.(type) { |
| case string: |
| return v.(string) |
| default: |
| return fmt.Sprintf("%v", v) |
| } |
| } |
| |
| func getIntLabel(le adapter.LogEntry, key string) int { |
| return getInt(le.Labels, key) |
| } |
| |
| func getInt(m map[string]interface{}, key string) int { |
| v := m[key] |
| if v == nil { |
| return 0 |
| } |
| switch v.(type) { |
| case int: |
| return v.(int) |
| case int64: |
| return int(v.(int64)) |
| default: |
| return 0 |
| } |
| } |
| |
| func getTimestampLabel(le adapter.LogEntry, key string) int64 { |
| return getTimestamp(le.Labels, key) |
| } |
| |
| func getTimestamp(m map[string]interface{}, key string) int64 { |
| v := m[key] |
| if v == nil { |
| return 0 |
| } |
| switch v.(type) { |
| case time.Time: |
| return (v.(time.Time)).UnixNano() |
| case int64: |
| return v.(int64) |
| default: |
| return 0 |
| } |
| } |
| |
| func (l *analyticsLogger) Close() error { return nil } |