[XAPID-377] Added validation and enrichment(incomplete) for analytics payload in API
diff --git a/api.go b/api.go index 6f29dce..a1710cd 100644 --- a/api.go +++ b/api.go
@@ -1,10 +1,9 @@ package apidAnalytics import ( - "database/sql" - "encoding/json" "github.com/30x/apid" "net/http" + "strings" ) var analyticsBasePath string @@ -15,8 +14,8 @@ } type dbError struct { - reason string - errorCode string + ErrorCode string `json:"errorCode"` + Reason string `json:"reason"` } type tenant struct { @@ -32,66 +31,35 @@ func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + db, _ := data.DB() if db == nil { - w.WriteHeader(http.StatusServiceUnavailable) - w.Write([]byte("Still initializing service")) + writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") + return + } + + if !strings.EqualFold(r.Header.Get("Content-Type"),"application/json") { + writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE","Only supported content type is application/json") return } vars := apid.API().Vars(r) scopeuuid := vars["bundle_scope_uuid"] - tenant, err := getTenantForScope(scopeuuid) - if err.errorCode != "" { - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - switch err.errorCode { - case "SEARCH_INTERNAL_ERROR": - w.WriteHeader(http.StatusInternalServerError) - if err := json.NewEncoder(w).Encode(errResponse{"SEARCH_INTERNAL_ERROR", err.reason}); err != nil { - panic(err) - } + tenant, dbErr := getTenantForScope(scopeuuid) + if dbErr.ErrorCode != "" { + switch dbErr.ErrorCode { + case "INTERNAL_SEARCH_ERROR": + writeError(w, http.StatusInternalServerError, "INTERNAL_SEARCH_ERROR", dbErr.Reason) case "UNKNOWN_SCOPE": - w.WriteHeader(http.StatusBadRequest) - if err := json.NewEncoder(w).Encode(errResponse{"UNKNOWN_SCOPE", err.reason}); err != nil { - panic(err) - } + writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason) } } else { - message := saveToFile(tenant) - w.WriteHeader(http.StatusOK) - w.Write([]byte(message)) - } -} - -func getTenantForScope(scopeuuid string) (tenant, dbError) { - // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event - var org, env string - { - db, err := apid.Data().DB() - switch { - case err != nil: - reason := err.Error() - errorCode := "SEARCH_INTERNAL_ERROR" - return tenant{org, env}, dbError{reason, errorCode} - } - - error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org) - - switch { - case error == sql.ErrNoRows: - reason := "No tenant found for this scopeuuid: " + scopeuuid - errorCode := "UNKNOWN_SCOPE" - return tenant{org, env}, dbError{reason, errorCode} - case error != nil: - reason := error.Error() - errorCode := "SEARCH_INTERNAL_ERROR" - return tenant{org, env}, dbError{reason, errorCode} + err := processPayload(tenant, scopeuuid, r); + if err.ErrorCode == "" { + w.WriteHeader(http.StatusOK) + } else { + writeError(w, http.StatusBadRequest, err.ErrorCode, err.Reason) } } - return tenant{org, env}, dbError{} -} - -func saveToFile(tenant tenant) string { - message := "hey " + tenant.org + "~" + tenant.env - return message }
diff --git a/apidAnalytics_test.go b/apidAnalytics_test.go index 97050f4..dcbf64c 100644 --- a/apidAnalytics_test.go +++ b/apidAnalytics_test.go
@@ -9,7 +9,7 @@ Context("get tenant for valid scopeuuid", func() { It("should return testorg and testenv", func() { tenant, dbError := getTenantForScope("testid") - Expect(dbError.reason).To(Equal("")) + Expect(dbError.Reason).To(Equal("")) Expect(tenant.org).To(Equal("testorg")) Expect(tenant.env).To(Equal("testenv")) }) @@ -19,7 +19,7 @@ It("should return empty tenant and a db error", func() { tenant, dbError := getTenantForScope("wrongid") Expect(tenant.org).To(Equal("")) - Expect(dbError.errorCode).To(Equal("UNKNOWN_SCOPE")) + Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) }) }) }) \ No newline at end of file
diff --git a/helper.go b/helper.go new file mode 100644 index 0000000..b092db1 --- /dev/null +++ b/helper.go
@@ -0,0 +1,173 @@ +package apidAnalytics + +import ( + "database/sql" + "encoding/json" + "github.com/30x/apid" + "net/http" + "io" + "io/ioutil" + "strings" + "compress/gzip" +) + +type developerInfo struct { + apiProduct string + developerApp string + developerEmail string + developer string +} + +func getTenantForScope(scopeuuid string) (tenant, dbError) { + // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event + var org, env string + { + db, err := apid.Data().DB() + switch { + case err != nil: + reason := err.Error() + errorCode := "INTERNAL_SEARCH_ERROR" + return tenant{org, env}, dbError{errorCode, reason} + } + + error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org) + + switch { + case error == sql.ErrNoRows: + reason := "No tenant found for this scopeuuid: " + scopeuuid + errorCode := "UNKNOWN_SCOPE" + return tenant{org, env}, dbError{errorCode, reason} + case error != nil: + reason := error.Error() + errorCode := "INTERNAL_SEARCH_ERROR" + return tenant{org, env}, dbError{errorCode, reason} + } + } + return tenant{org, env}, dbError{} +} + + +func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { + var gzipEncoded bool + if r.Header.Get("Content-Encoding") != "" { + if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") { + return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"} + } else { + gzipEncoded = true + } + } + + var reader io.ReadCloser + var err error + if gzipEncoded { + reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data + if err != nil { + return errResponse{"BAD_DATA", "Gzip data cannot be read"} + } + } else { + reader = r.Body + } + + body, _ := ioutil.ReadAll(reader) + errMessage := validateAndEnrich(tenant, scopeuuid, body) + if errMessage.ErrorCode != "" { + return errMessage + } + return errResponse{} +} + +func validateAndEnrich(tenant tenant, scopeuuid string, body []byte) errResponse { + var raw map[string]interface{} + json.Unmarshal(body, &raw) + if records := raw["records"]; records != nil { + for _, eachRecord := range records.([]interface{}) { + recordMap := eachRecord.(map[string]interface{}) + valid, err := validate(recordMap) + if valid { + enrich(recordMap, scopeuuid, tenant) + log.Debugf("Raw records : %v ", eachRecord) + } else { + return err // Even if there is one bad record, then reject entire batch + } + } + // TODO: add the batch of records to a channel for consumption + } else { + return errResponse{"NO_RECORDS", "No analytics records in the payload"} + } + return errResponse{} +} + +func validate(recordMap map[string]interface{}) (bool, errResponse) { + elems := []string{"client_received_start_timestamp"} + for _, elem := range elems { + if recordMap[elem] == nil { + return false, errResponse{"MISSING_FIELD", "Missing field: " + elem} + } + } + + crst, exists1 := recordMap["client_received_start_timestamp"] + cret, exists2 := recordMap["client_received_end_timestamp"] + if exists1 && exists2 { + if crst.(int64) > cret.(int64) { + return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} + + } + } + // api key is required to find other info + _, exists3 := recordMap["client_id"] + if !exists3 { + return false, errResponse{"BAD_DATA", "client_id cannot be null"} + } + return true, errResponse{} +} + +func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) { + recordMap["organization"] = tenant.org + recordMap["environment"] = tenant.env + apiKey := recordMap["client_id"].(string) + devInfo := getDeveloperInfo(scopeuuid, apiKey) + recordMap["api_product"] = devInfo.apiProduct + recordMap["developer_app"] = devInfo.developerApp + recordMap["developer_email"] = devInfo.developerEmail + recordMap["developer"] = devInfo.developer +} + +// if info not found then dont set it +func getDeveloperInfo(scopeuuid string, apiKey string) developerInfo { + // TODO: create a cache during init and refresh it on update event + var apiProduct, developerApp, developerEmail, developer string + { + db, err := apid.Data().DB() + switch { + case err != nil: + return developerInfo{} + } + + // TODO: query needs to change (wont work, it is just a placeholder) + error := db.QueryRow("SELECT apiProduct, developerApp, developerEmail, developer FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&apiProduct, &developerApp, &developerEmail, &developer) + + switch { + case error == sql.ErrNoRows: + return developerInfo{} + case error != nil: + return developerInfo{} + } + } + return developerInfo{apiProduct, developerApp, developerEmail, developer} + // For local testing + //return developerInfo{"test_product", "test_app", "test@test.com", "test"} +} + +func writeError(w http.ResponseWriter, status int, code string, reason string) { + w.WriteHeader(status) + e := errResponse{ + ErrorCode: code, + Reason: reason, + } + bytes, err := json.Marshal(e) + if err != nil { + log.Errorf("unable to marshal errorResponse: %v", err) + } else { + w.Write(bytes) + } +}