[XAPID-377] Added validation and enrichment(incomplete) for analytics payload in API
diff --git a/api.go b/api.go
index 6f29dce..a1710cd 100644
--- a/api.go
+++ b/api.go
@@ -1,10 +1,9 @@
package apidAnalytics
import (
- "database/sql"
- "encoding/json"
"github.com/30x/apid"
"net/http"
+ "strings"
)
var analyticsBasePath string
@@ -15,8 +14,8 @@
}
type dbError struct {
- reason string
- errorCode string
+ ErrorCode string `json:"errorCode"`
+ Reason string `json:"reason"`
}
type tenant struct {
@@ -32,66 +31,35 @@
func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+
db, _ := data.DB()
if db == nil {
- w.WriteHeader(http.StatusServiceUnavailable)
- w.Write([]byte("Still initializing service"))
+ writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely")
+ return
+ }
+
+ if !strings.EqualFold(r.Header.Get("Content-Type"),"application/json") {
+ writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE","Only supported content type is application/json")
return
}
vars := apid.API().Vars(r)
scopeuuid := vars["bundle_scope_uuid"]
- tenant, err := getTenantForScope(scopeuuid)
- if err.errorCode != "" {
- w.Header().Set("Content-Type", "application/json; charset=UTF-8")
- switch err.errorCode {
- case "SEARCH_INTERNAL_ERROR":
- w.WriteHeader(http.StatusInternalServerError)
- if err := json.NewEncoder(w).Encode(errResponse{"SEARCH_INTERNAL_ERROR", err.reason}); err != nil {
- panic(err)
- }
+ tenant, dbErr := getTenantForScope(scopeuuid)
+ if dbErr.ErrorCode != "" {
+ switch dbErr.ErrorCode {
+ case "INTERNAL_SEARCH_ERROR":
+ writeError(w, http.StatusInternalServerError, "INTERNAL_SEARCH_ERROR", dbErr.Reason)
case "UNKNOWN_SCOPE":
- w.WriteHeader(http.StatusBadRequest)
- if err := json.NewEncoder(w).Encode(errResponse{"UNKNOWN_SCOPE", err.reason}); err != nil {
- panic(err)
- }
+ writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason)
}
} else {
- message := saveToFile(tenant)
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(message))
- }
-}
-
-func getTenantForScope(scopeuuid string) (tenant, dbError) {
- // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event
- var org, env string
- {
- db, err := apid.Data().DB()
- switch {
- case err != nil:
- reason := err.Error()
- errorCode := "SEARCH_INTERNAL_ERROR"
- return tenant{org, env}, dbError{reason, errorCode}
- }
-
- error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org)
-
- switch {
- case error == sql.ErrNoRows:
- reason := "No tenant found for this scopeuuid: " + scopeuuid
- errorCode := "UNKNOWN_SCOPE"
- return tenant{org, env}, dbError{reason, errorCode}
- case error != nil:
- reason := error.Error()
- errorCode := "SEARCH_INTERNAL_ERROR"
- return tenant{org, env}, dbError{reason, errorCode}
+ err := processPayload(tenant, scopeuuid, r);
+ if err.ErrorCode == "" {
+ w.WriteHeader(http.StatusOK)
+ } else {
+ writeError(w, http.StatusBadRequest, err.ErrorCode, err.Reason)
}
}
- return tenant{org, env}, dbError{}
-}
-
-func saveToFile(tenant tenant) string {
- message := "hey " + tenant.org + "~" + tenant.env
- return message
}
diff --git a/apidAnalytics_test.go b/apidAnalytics_test.go
index 97050f4..dcbf64c 100644
--- a/apidAnalytics_test.go
+++ b/apidAnalytics_test.go
@@ -9,7 +9,7 @@
Context("get tenant for valid scopeuuid", func() {
It("should return testorg and testenv", func() {
tenant, dbError := getTenantForScope("testid")
- Expect(dbError.reason).To(Equal(""))
+ Expect(dbError.Reason).To(Equal(""))
Expect(tenant.org).To(Equal("testorg"))
Expect(tenant.env).To(Equal("testenv"))
})
@@ -19,7 +19,7 @@
It("should return empty tenant and a db error", func() {
tenant, dbError := getTenantForScope("wrongid")
Expect(tenant.org).To(Equal(""))
- Expect(dbError.errorCode).To(Equal("UNKNOWN_SCOPE"))
+ Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE"))
})
})
})
\ No newline at end of file
diff --git a/helper.go b/helper.go
new file mode 100644
index 0000000..b092db1
--- /dev/null
+++ b/helper.go
@@ -0,0 +1,173 @@
+package apidAnalytics
+
+import (
+ "database/sql"
+ "encoding/json"
+ "github.com/30x/apid"
+ "net/http"
+ "io"
+ "io/ioutil"
+ "strings"
+ "compress/gzip"
+)
+
+type developerInfo struct {
+ apiProduct string
+ developerApp string
+ developerEmail string
+ developer string
+}
+
+func getTenantForScope(scopeuuid string) (tenant, dbError) {
+ // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event
+ var org, env string
+ {
+ db, err := apid.Data().DB()
+ switch {
+ case err != nil:
+ reason := err.Error()
+ errorCode := "INTERNAL_SEARCH_ERROR"
+ return tenant{org, env}, dbError{errorCode, reason}
+ }
+
+ error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org)
+
+ switch {
+ case error == sql.ErrNoRows:
+ reason := "No tenant found for this scopeuuid: " + scopeuuid
+ errorCode := "UNKNOWN_SCOPE"
+ return tenant{org, env}, dbError{errorCode, reason}
+ case error != nil:
+ reason := error.Error()
+ errorCode := "INTERNAL_SEARCH_ERROR"
+ return tenant{org, env}, dbError{errorCode, reason}
+ }
+ }
+ return tenant{org, env}, dbError{}
+}
+
+
+func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
+ var gzipEncoded bool
+ if r.Header.Get("Content-Encoding") != "" {
+ if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") {
+ return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"}
+ } else {
+ gzipEncoded = true
+ }
+ }
+
+ var reader io.ReadCloser
+ var err error
+ if gzipEncoded {
+ reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
+ if err != nil {
+ return errResponse{"BAD_DATA", "Gzip data cannot be read"}
+ }
+ } else {
+ reader = r.Body
+ }
+
+ body, _ := ioutil.ReadAll(reader)
+ errMessage := validateAndEnrich(tenant, scopeuuid, body)
+ if errMessage.ErrorCode != "" {
+ return errMessage
+ }
+ return errResponse{}
+}
+
+func validateAndEnrich(tenant tenant, scopeuuid string, body []byte) errResponse {
+ var raw map[string]interface{}
+ json.Unmarshal(body, &raw)
+ if records := raw["records"]; records != nil {
+ for _, eachRecord := range records.([]interface{}) {
+ recordMap := eachRecord.(map[string]interface{})
+ valid, err := validate(recordMap)
+ if valid {
+ enrich(recordMap, scopeuuid, tenant)
+ log.Debugf("Raw records : %v ", eachRecord)
+ } else {
+ return err // Even if there is one bad record, then reject entire batch
+ }
+ }
+ // TODO: add the batch of records to a channel for consumption
+ } else {
+ return errResponse{"NO_RECORDS", "No analytics records in the payload"}
+ }
+ return errResponse{}
+}
+
+func validate(recordMap map[string]interface{}) (bool, errResponse) {
+ elems := []string{"client_received_start_timestamp"}
+ for _, elem := range elems {
+ if recordMap[elem] == nil {
+ return false, errResponse{"MISSING_FIELD", "Missing field: " + elem}
+ }
+ }
+
+ crst, exists1 := recordMap["client_received_start_timestamp"]
+ cret, exists2 := recordMap["client_received_end_timestamp"]
+ if exists1 && exists2 {
+ if crst.(int64) > cret.(int64) {
+ return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"}
+
+ }
+ }
+ // api key is required to find other info
+ _, exists3 := recordMap["client_id"]
+ if !exists3 {
+ return false, errResponse{"BAD_DATA", "client_id cannot be null"}
+ }
+ return true, errResponse{}
+}
+
+func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
+ recordMap["organization"] = tenant.org
+ recordMap["environment"] = tenant.env
+ apiKey := recordMap["client_id"].(string)
+ devInfo := getDeveloperInfo(scopeuuid, apiKey)
+ recordMap["api_product"] = devInfo.apiProduct
+ recordMap["developer_app"] = devInfo.developerApp
+ recordMap["developer_email"] = devInfo.developerEmail
+ recordMap["developer"] = devInfo.developer
+}
+
+// if info not found then dont set it
+func getDeveloperInfo(scopeuuid string, apiKey string) developerInfo {
+ // TODO: create a cache during init and refresh it on update event
+ var apiProduct, developerApp, developerEmail, developer string
+ {
+ db, err := apid.Data().DB()
+ switch {
+ case err != nil:
+ return developerInfo{}
+ }
+
+ // TODO: query needs to change (wont work, it is just a placeholder)
+ error := db.QueryRow("SELECT apiProduct, developerApp, developerEmail, developer FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&apiProduct, &developerApp, &developerEmail, &developer)
+
+ switch {
+ case error == sql.ErrNoRows:
+ return developerInfo{}
+ case error != nil:
+ return developerInfo{}
+ }
+ }
+ return developerInfo{apiProduct, developerApp, developerEmail, developer}
+ // For local testing
+ //return developerInfo{"test_product", "test_app", "test@test.com", "test"}
+}
+
+func writeError(w http.ResponseWriter, status int, code string, reason string) {
+ w.WriteHeader(status)
+ e := errResponse{
+ ErrorCode: code,
+ Reason: reason,
+ }
+ bytes, err := json.Marshal(e)
+ if err != nil {
+ log.Errorf("unable to marshal errorResponse: %v", err)
+ } else {
+ w.Write(bytes)
+ }
+}