blob: fc2ae2f1813e328d1f614a7a3d30846d70845e88 [file] [log] [blame]
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter
import (
"bytes"
"compress/gzip"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/apid/istioApigeeAdapter/adapter/config"
"github.com/apid/istioApigeeAdapter/common"
"istio.io/mixer/pkg/adapter"
)
const (
regionLookupURL = "https://edgemicroservices.apigee.net/edgemicro"
nanosPerMillisecond = 1000000
)
type (
analyticsBuilder struct {
adapter.DefaultBuilder
}
analyticsLogger struct {
env adapter.Env
collectionURL string
organization string
environment string
actualURL string
key string
secret string
latch *sync.Mutex
}
)
func newReportBuilder() analyticsBuilder {
return analyticsBuilder{
adapter.NewDefaultBuilder(
"apigeeReport",
"Report logs to apigee",
&config.ReportParams{},
)}
}
func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) {
cfg := c.(*config.ReportParams)
if cfg.Organization == "" {
ce = ce.Appendf("organization", "Organization parameter must be specified")
}
if cfg.Environment == "" {
ce = ce.Appendf("environment", "Environment parameter must be specified")
}
if cfg.CollectionURL != "" {
_, err := url.Parse(cfg.CollectionURL)
if err != nil {
ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err)
}
}
if cfg.Key == "" {
ce = ce.Appendf("key", "Key must be specified")
}
if cfg.Secret == "" {
ce = ce.Appendf("secret", "Secret must be specified")
}
return
}
func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) {
return newLogger(env, cfg)
}
func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) {
return newLogger(env, cfg)
}
func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) {
cfg := c.(*config.ReportParams)
al := &analyticsLogger{
env: env,
organization: cfg.Organization,
environment: cfg.Environment,
collectionURL: cfg.CollectionURL,
key: cfg.Key,
secret: cfg.Secret,
latch: &sync.Mutex{},
}
return al, nil
}
func (l *analyticsLogger) Log(entries []adapter.LogEntry) error {
return l.log(entries)
}
func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error {
return l.log(entries)
}
func (l *analyticsLogger) log(entries []adapter.LogEntry) error {
var records []common.AnalyticsRecord
for _, entry := range entries {
sourceIP := getStringLabel(entry, "sourceIP")
path := getStringLabel(entry, "urlPath")
host := getStringLabel(entry, "hostHeader")
method := getStringLabel(entry, "httpMethod")
userAgent := getStringLabel(entry, "userAgent")
proxyName := getStringLabel(entry, "proxyName")
proxyRevision := getIntLabel(entry, "proxyRevision")
// Convert nanos (Go) to millis (Java)
requestTime := getTimestampLabel(entry, "requestTime") / nanosPerMillisecond
responseTime := getTimestampLabel(entry, "responseTime") / nanosPerMillisecond
responseCode := getIntLabel(entry, "responseCode")
r := common.AnalyticsRecord{
ClientReceivedStartTimestamp: requestTime,
// historically we want to make sure end is after start due to various AX stuff
ClientReceivedEndTimestamp: requestTime + 1,
ClientSentStartTimestamp: responseTime,
ClientSentEndTimestamp: responseTime + 1,
// Missing: Target times
ClientIP: sourceIP,
RequestVerb: method,
UserAgent: userAgent,
ResponseStatusCode: responseCode,
// Technically wrong because of no scheme and host header
RequestURI: "http://" + host + path,
RequestPath: strings.Split(path, "?")[0],
APIProxy: proxyName,
APIProxyRevision: proxyRevision,
RecordType: "APIAnalytics",
}
records = append(records, r)
}
ax := common.AnalyticsRequest{
Records: records,
}
err := l.pushRecords(&ax)
if err != nil {
l.env.Logger().Errorf("Error pushing analytics: %s", err)
}
return err
}
func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error {
pushURL, err := l.getPushURL()
if err != nil {
return err
}
bb := &bytes.Buffer{}
zw := gzip.NewWriter(bb)
jw := json.NewEncoder(zw)
err = jw.Encode(ax)
if err != nil {
return err
}
zw.Close()
req, err := http.NewRequest("POST", pushURL, bb)
if err != nil {
return err
}
req.Header.Set("content-type", "application/json")
req.Header.Set("content-encoding", "gzip")
req.SetBasicAuth(l.key, l.secret)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode)
}
dec := json.NewDecoder(resp.Body)
var axResponse common.AnalyticsResponse
err = dec.Decode(&axResponse)
if err != nil {
return err
}
if axResponse.Rejected > 0 {
return fmt.Errorf("%d out of %d analytics records rejected",
axResponse.Rejected, len(ax.Records))
}
l.env.Logger().Infof("%d out of %d analytics records accepted",
axResponse.Accepted, len(ax.Records))
return nil
}
func (l *analyticsLogger) getPushURL() (string, error) {
l.latch.Lock()
defer l.latch.Unlock()
if l.actualURL != "" {
return l.actualURL, nil
}
if l.collectionURL != "" {
l.actualURL = fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s",
l.collectionURL, l.organization, l.environment)
l.env.Logger().Infof("Sending analytics data to %s", l.actualURL)
return l.actualURL, nil
}
// Default config for Apigee cloud customers -- call web service to get correct URL
lookupURL := fmt.Sprintf("%s/region/organization/%s/environment/%s",
regionLookupURL, l.organization, l.environment)
req, err := http.NewRequest("GET", lookupURL, nil)
if err != nil {
return "", err
}
req.SetBasicAuth(l.key, l.secret)
resp, err := http.DefaultClient.Do(req)
if err != nil {
l.env.Logger().Errorf("Error %s getting region for analytics data from %s",
err, lookupURL)
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
msg := fmt.Sprintf("Error %d getting region for analytics data from %s",
resp.StatusCode, lookupURL)
l.env.Logger().Errorf(msg)
return "", errors.New(msg)
}
var region common.RegionResponse
jr := json.NewDecoder(resp.Body)
err = jr.Decode(&region)
if err != nil {
l.env.Logger().Errorf("Error parsing JSON from region service: %s", err)
return "", err
}
l.actualURL = fmt.Sprintf("https://%s/edgemicro/axpublisher/organization/%s/environment/%s",
region.Host, l.organization, l.environment)
l.env.Logger().Infof("Using region \"%s\". Publishing to %s",
region.Region, l.actualURL)
return l.actualURL, nil
}
func getStringLabel(le adapter.LogEntry, key string) string {
v := le.Labels[key]
if v == nil {
return ""
}
switch v.(type) {
case string:
return v.(string)
default:
return fmt.Sprintf("%v", v)
}
}
func getIntLabel(le adapter.LogEntry, key string) int {
v := le.Labels[key]
if v == nil {
return 0
}
switch v.(type) {
case int:
return v.(int)
case int64:
return int(v.(int64))
default:
return 0
}
}
func getTimestampLabel(le adapter.LogEntry, key string) int64 {
v := le.Labels[key]
if v == nil {
return 0
}
switch v.(type) {
case time.Time:
return (v.(time.Time)).UnixNano()
case int64:
return v.(int64)
default:
return 0
}
}
func (l *analyticsLogger) Close() error { return nil }