blob: 7ae0b8f3c353175fd0df12e3170120a76d14df44 [file] [log] [blame]
package apidAnalytics
import (
"encoding/json"
"net/http"
"io"
"io/ioutil"
"strings"
"compress/gzip"
)
type developerInfo struct {
apiProduct string
developerApp string
developerEmail string
developer string
}
func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") {
return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"}
} else {
gzipEncoded = true
}
}
var reader io.ReadCloser
var err error
if gzipEncoded {
reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
return errResponse{"BAD_DATA", "Gzip data cannot be read"}
}
} else {
reader = r.Body
}
body, _ := ioutil.ReadAll(reader)
errMessage := validateEnrichPublish(tenant, scopeuuid, body)
if errMessage.ErrorCode != "" {
return errMessage
}
return errResponse{}
}
func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse {
var raw map[string]interface{}
json.Unmarshal(body, &raw)
if records := raw["records"]; records != nil {
for _, eachRecord := range records.([]interface{}) {
recordMap := eachRecord.(map[string]interface{})
valid, err := validate(recordMap)
if valid {
enrich(recordMap, scopeuuid, tenant)
log.Debugf("Raw records : %v ", eachRecord)
} else {
return err // Even if there is one bad record, then reject entire batch
}
}
publishToChannel(records.([]interface{}))
} else {
return errResponse{"NO_RECORDS", "No analytics records in the payload"}
}
return errResponse{}
}
func validate(recordMap map[string]interface{}) (bool, errResponse) {
elems := []string{"client_received_start_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
return false, errResponse{"MISSING_FIELD", "Missing field: " + elem}
}
}
crst, exists1 := recordMap["client_received_start_timestamp"]
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
if crst.(int64) > cret.(int64) {
return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"}
}
}
return true, errResponse{}
}
func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
if recordMap["organization"] == "" {
recordMap["organization"] = tenant.org
}
if recordMap["environment"] == "" {
recordMap["environment"] = tenant.env
}
apiKey, exists := recordMap["client_id"]
// apiKey doesnt exist then ignore adding developer fields
if exists {
apiKey := apiKey.(string)
devInfo := getDeveloperInfo(tenant.tenantId, apiKey)
log.Debugf("developerInfo = %v", devInfo)
if recordMap["api_product"] == "" {
recordMap["api_product"] = devInfo.apiProduct
}
if recordMap["developer_app"] == "" {
recordMap["developer_app"] = devInfo.developerApp
}
if recordMap["developer_email"] == "" {
recordMap["developer_email"] = devInfo.developerEmail
}
if recordMap["developer"] == "" {
recordMap["developer"] = devInfo.developer
}
}
}
func publishToChannel(records []interface{}) {
// TODO: add the batch of records to a channel for consumption
return
}
func writeError(w http.ResponseWriter, status int, code string, reason string) {
w.WriteHeader(status)
e := errResponse{
ErrorCode: code,
Reason: reason,
}
bytes, err := json.Marshal(e)
if err != nil {
log.Errorf("unable to marshal errorResponse: %v", err)
} else {
w.Write(bytes)
}
}