blob: 67e97258fff147f603db5ebe08bcbbf0bfca57cb [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidAnalytics
import (
"compress/gzip"
"encoding/json"
"io"
"net/http"
"strings"
)
/*
Implements all the helper methods needed to process the POST /analytics payload
and send it to the internal buffer channel
*/
type developerInfo struct {
ApiProduct string
DeveloperApp string
DeveloperEmail string
Developer string
}
type axRecords struct {
Tenant tenant
// Records is an array of multiple analytics records
Records []interface{}
}
type tenant struct {
Org string
Env string
}
func getJsonBody(r *http.Request) (map[string]interface{}, errResponse) {
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
return nil, errResponse{
ErrorCode: "UNSUPPORTED_CONTENT_ENCODING",
Reason: "Only supported content encoding is gzip"}
} else {
gzipEncoded = true
}
}
var reader io.ReadCloser
var err error
if gzipEncoded {
reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
return nil, errResponse{
ErrorCode: "BAD_DATA",
Reason: "Gzip Encoded data cannot be read"}
}
} else {
reader = r.Body
}
var raw map[string]interface{}
decoder := json.NewDecoder(reader) // Decode payload to JSON data
decoder.UseNumber()
if err := decoder.Decode(&raw); err != nil {
return nil, errResponse{ErrorCode: "BAD_DATA",
Reason: "Not a valid JSON payload"}
}
return raw, errResponse{}
}
/*
Get tenant from payload based on the 2 required fields - organization and environment
*/
func getTenantFromPayload(raw map[string]interface{}) (tenant, errResponse) {
elems := []string{"organization", "environment"}
for _, elem := range elems {
if raw[elem] == nil || raw[elem].(string) == "" {
return tenant{}, errResponse{
ErrorCode: "MISSING_FIELD",
Reason: "Missing Required field: " + elem}
}
}
org := raw["organization"].(string)
env := raw["environment"].(string)
return tenant{Org: org, Env: env}, errResponse{}
}
func validateEnrichPublish(tenant tenant, raw map[string]interface{}) errResponse {
if records := raw["records"]; records != nil {
records, isArray := records.([]interface{})
if !isArray {
return errResponse{
ErrorCode: "BAD_DATA",
Reason: "records should be a list of analytics records"}
}
if len(records) == 0 {
return errResponse{
ErrorCode: "NO_RECORDS",
Reason: "No analytics records in the payload"}
}
// Iterate through each record to validate and enrich it
for _, eachRecord := range records {
recordMap, isMap := eachRecord.(map[string]interface{})
if !isMap {
return errResponse{
ErrorCode: "BAD_DATA",
Reason: "Each Analytics record in records should be a json object"}
}
valid, err := validate(recordMap)
if valid {
enrich(recordMap, tenant)
} else {
// Even if there is one bad record, then reject entire batch
return err
}
}
axRecords := axRecords{
Tenant: tenant,
Records: records}
// publish batch of records to channel (blocking call)
internalBuffer <- axRecords
} else {
return errResponse{
ErrorCode: "NO_RECORDS",
Reason: "No analytics records in the payload"}
}
return errResponse{}
}
/*
Does basic validation on each analytics message
1. client_received_start_timestamp, client_received_end_timestamp should exist
2. client_received_start_timestamp, client_received_end_timestamp should be a number
3. client_received_end_timestamp should be > client_received_start_timestamp and not 0
*/
func validate(recordMap map[string]interface{}) (bool, errResponse) {
elems := []string{"client_received_start_timestamp", "client_received_end_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
return false, errResponse{
ErrorCode: "MISSING_FIELD",
Reason: "Missing Required field: " + elem}
}
}
crst, exists1 := recordMap["client_received_start_timestamp"]
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
crst, isNumber1 := crst.(json.Number)
cret, isNumber2 := cret.(json.Number)
if !isNumber1 || !isNumber2 {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp and " +
"client_received_end_timestamp has to be number"}
} else if crst == json.Number("0") || cret == json.Number("0") {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp or " +
"client_received_end_timestamp cannot be 0"}
} else if crst > cret {
return false, errResponse{
ErrorCode: "BAD_DATA",
Reason: "client_received_start_timestamp " +
"> client_received_end_timestamp"}
}
}
return true, errResponse{}
}
/*
Enrich each record by adding org and env fields
*/
func enrich(recordMap map[string]interface{}, tenant tenant) {
// Always overwrite organization/environment value with the tenant information provided in the payload
recordMap["organization"] = tenant.Org
recordMap["environment"] = tenant.Env
}
func writeError(w http.ResponseWriter, status int, code string, reason string) {
w.WriteHeader(status)
e := errResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
}