blob: ae4f297150c7bfa504c68647ff820d88b027ae7c [file] [log] [blame]
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package adapter
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"
"github.com/apid/istioApigeeAdapter/adapter/config"
"github.com/apid/istioApigeeAdapter/common"
"istio.io/mixer/pkg/adapter"
)
const (
defaultCollectURL = "https://edgemicroservices.apigee.net/edgemicro"
)
type (
analyticsBuilder struct {
adapter.DefaultBuilder
}
analyticsLogger struct {
env adapter.Env
collectionURL string
key string
secret string
}
)
func newReportBuilder() analyticsBuilder {
return analyticsBuilder{
adapter.NewDefaultBuilder(
"apigeeReport",
"Report logs to apigee",
&config.ReportParams{},
)}
}
func (b analyticsBuilder) ValidateConfig(c adapter.Config) (ce *adapter.ConfigErrors) {
cfg := c.(*config.ReportParams)
if cfg.Organization == "" {
ce = ce.Appendf("organization", "Organization parameter must be specified")
}
if cfg.Environment == "" {
ce = ce.Appendf("environment", "Environment parameter must be specified")
}
if cfg.CollectionURL != "" {
_, err := url.Parse(cfg.CollectionURL)
if err != nil {
ce = ce.Appendf("collectionURL", "Invalid collection URL: %s", err)
}
}
if cfg.Key == "" {
ce = ce.Appendf("key", "Key must be specified")
}
if cfg.Secret == "" {
ce = ce.Appendf("secret", "Secret must be specified")
}
return
}
func (b analyticsBuilder) NewApplicationLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.ApplicationLogsAspect, error) {
return newLogger(env, cfg)
}
func (b analyticsBuilder) NewAccessLogsAspect(env adapter.Env, cfg adapter.Config) (adapter.AccessLogsAspect, error) {
return newLogger(env, cfg)
}
func newLogger(env adapter.Env, c adapter.Config) (*analyticsLogger, error) {
cfg := c.(*config.ReportParams)
var basePath string
if cfg.CollectionURL == "" {
basePath = defaultCollectURL
} else {
basePath = cfg.CollectionURL
}
collectionURL := fmt.Sprintf("%s/axpublisher/organization/%s/environment/%s",
basePath, cfg.Organization, cfg.Environment)
al := &analyticsLogger{
env: env,
collectionURL: collectionURL,
key: cfg.Key,
secret: cfg.Secret,
}
env.Logger().Infof("Created Apigee Report adapter to invoke \"%s\"", collectionURL)
return al, nil
}
func (l *analyticsLogger) Log(entries []adapter.LogEntry) error {
return l.log(entries)
}
func (l *analyticsLogger) LogAccess(entries []adapter.LogEntry) error {
return l.log(entries)
}
func (l *analyticsLogger) log(entries []adapter.LogEntry) error {
var records []common.AnalyticsRecord
for _, entry := range entries {
sourceIP := getStringLabel(entry, "sourceIP")
path := getStringLabel(entry, "urlPath")
host := getStringLabel(entry, "hostHeader")
method := getStringLabel(entry, "httpMethod")
userAgent := getStringLabel(entry, "userAgent")
// Convert nanos (Go) to millis (Java)
requestTime := getTimestampLabel(entry, "requestTime") / 1000
responseTime := getTimestampLabel(entry, "responseTime") / 1000
responseCode := getIntLabel(entry, "responseCode")
r := common.AnalyticsRecord{
ClientReceivedStartTimestamp: requestTime,
ClientReceivedEndTimestamp: requestTime + 1,
ClientSentStartTimestamp: responseTime,
ClientSentEndTimestamp: responseTime + 1,
// Missing: Target times
ClientIP: sourceIP,
RequestVerb: method,
UserAgent: userAgent,
ResponseStatusCode: responseCode,
// Technically wrong because of no scheme and host header
RequestURI: "http://" + host + path,
// Should this include query params? Perhaps not.
RequestPath: path,
// Hard-coded, but should come from config
APIProxy: "istio",
APIProxyRevision: 1,
RecordType: "APIAnalytics",
}
records = append(records, r)
}
ax := common.AnalyticsRequest{
Records: records,
}
err := l.pushRecords(&ax)
if err != nil {
l.env.Logger().Errorf("Error pushing analytics: %s", err)
}
return err
}
func (l *analyticsLogger) pushRecords(ax *common.AnalyticsRequest) error {
jsonBody, err := json.Marshal(ax)
if err != nil {
return err
}
req, err := http.NewRequest("POST", l.collectionURL,
bytes.NewBuffer(jsonBody))
if err != nil {
return err
}
req.Header.Set("content-type", "application/json")
req.SetBasicAuth(l.key, l.secret)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return fmt.Errorf("Error code %d from analytics back end", resp.StatusCode)
}
dec := json.NewDecoder(resp.Body)
var axResponse common.AnalyticsResponse
err = dec.Decode(&axResponse)
if err != nil {
return err
}
if axResponse.Rejected > 0 {
return fmt.Errorf("%d out of %d analytics records rejected",
axResponse.Rejected, len(ax.Records))
}
l.env.Logger().Infof("%d out of %d analytics records accepted",
axResponse.Accepted, len(ax.Records))
return nil
}
func getStringLabel(le adapter.LogEntry, key string) string {
v := le.Labels[key]
if v == nil {
return ""
}
switch v.(type) {
case string:
return v.(string)
default:
return fmt.Sprintf("%v", v)
}
}
func getIntLabel(le adapter.LogEntry, key string) int {
v := le.Labels[key]
if v == nil {
return 0
}
switch v.(type) {
case int:
return v.(int)
case int64:
return int(v.(int64))
default:
return 0
}
}
func getTimestampLabel(le adapter.LogEntry, key string) int64 {
v := le.Labels[key]
if v == nil {
return 0
}
switch v.(type) {
case time.Time:
return (v.(time.Time)).UnixNano()
case int64:
return v.(int64)
default:
return 0
}
}
func (l *analyticsLogger) Close() error { return nil }