[XAPID-377] Refactored code and implemented working upload to S3 functionality
diff --git a/api.go b/api.go index a34018d..a35bea5 100644 --- a/api.go +++ b/api.go
@@ -19,9 +19,9 @@ } type tenant struct { - org string - env string - tenantId string + Org string + Env string + TenantId string } func initAPI(services apid.Services) { @@ -40,7 +40,7 @@ return } - db = getDB() // When snapshot isnt processed + db = getDB() // When snapshot isnt processed if db == nil { writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") return
diff --git a/api_helper.go b/api_helper.go index ccda6d5..e89d1d9 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -11,10 +11,10 @@ type developerInfo struct { - apiProduct string - developerApp string - developerEmail string - developer string + ApiProduct string + DeveloperApp string + DeveloperEmail string + Developer string } func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { @@ -48,7 +48,10 @@ func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse { var raw map[string]interface{} - json.Unmarshal(body, &raw) + err := json.Unmarshal(body, &raw) + if err != nil { + return errResponse{"BAD_DATA", "Not a valid JSON payload"} + } if records := raw["records"]; records != nil { for _, eachRecord := range records.([]interface{}) { recordMap := eachRecord.(map[string]interface{}) @@ -56,7 +59,7 @@ if valid { enrich(recordMap, scopeuuid, tenant) // TODO: Remove log - log.Debugf("Raw records : %v ", eachRecord) + log.Debugf("Raw records : %v ", recordMap) } else { return err // Even if there is one bad record, then reject entire batch } @@ -81,44 +84,50 @@ if exists1 && exists2 { if crst.(int64) > cret.(int64) { return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} - } } return true, errResponse{} } func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) { - if recordMap["organization"] == "" { - recordMap["organization"] = tenant.org + org, orgExists := recordMap["organization"] + if !orgExists || org.(string) == "" { + recordMap["organization"] = tenant.Org + } + env, envExists := recordMap["environment"] + if !envExists || env.(string) == "" { + recordMap["environment"] = tenant.Env } - if recordMap["environment"] == "" { - recordMap["environment"] = tenant.env - } + apiKey, exists := recordMap["client_id"] // apiKey doesnt exist then ignore adding developer fields if exists { apiKey := apiKey.(string) - devInfo := getDeveloperInfo(tenant.tenantId, apiKey) + devInfo := getDeveloperInfo(tenant.TenantId, apiKey) // TODO: Remove log - log.Debugf("developerInfo = %v", devInfo) - if recordMap["api_product"] == "" { - recordMap["api_product"] = devInfo.apiProduct + _, exists := recordMap["api_product"] + if !exists { + recordMap["api_product"] = devInfo.ApiProduct } - if recordMap["developer_app"] == "" { - recordMap["developer_app"] = devInfo.developerApp + _, exists = recordMap["developer_app"] + if !exists { + recordMap["developer_app"] = devInfo.DeveloperApp } - if recordMap["developer_email"] == "" { - recordMap["developer_email"] = devInfo.developerEmail + _, exists = recordMap["developer_email"] + if !exists { + recordMap["developer_email"] = devInfo.DeveloperEmail } - if recordMap["developer"] == "" { - recordMap["developer"] = devInfo.developer + _, exists = recordMap["developer"] + if !exists { + recordMap["developer"] = devInfo.Developer } } } func publishToChannel(records []interface{}) { // TODO: add the batch of records to a channel for consumption + log.Debugf("records on channel : %v", records) return }
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go index da8d20d..347d694 100644 --- a/apidAnalytics_suite_test.go +++ b/apidAnalytics_suite_test.go
@@ -34,6 +34,7 @@ config.Set("data_path", testTempDir) config.Set(uapServerBase, "http://localhost:9000") // dummy value + config.Set(useCaching, false) db, err := apid.Data().DB() Expect(err).NotTo(HaveOccurred())
diff --git a/common_helper.go b/common_helper.go index dd2c629..e64252a 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -24,7 +24,7 @@ defer rows.Close() for rows.Next() { rows.Scan(&env, &org, &tenantId, &id); - tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId} + tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId} } } log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache)) @@ -59,7 +59,7 @@ dev := getValuesIgnoringNull(developer) devEmail := getValuesIgnoringNull(developerEmail) - developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev} + developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} } } log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache)) @@ -67,7 +67,6 @@ } func getTenantForScope(scopeuuid string) (tenant, dbError) { - if (config.GetBool(useCaching)) { _, exists := tenantCache[scopeuuid] if !exists { @@ -99,13 +98,15 @@ return tenant{}, dbError{errorCode, reason} } - return tenant{org: org, env:env, tenantId: tenantId}, dbError{} + return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{} } + // TODO: local testing + //return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{} } func getDeveloperInfo(tenantId string, apiKey string) developerInfo { if (config.GetBool(useCaching)) { - keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) + keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) _, exists := developerInfoCache[keyForMap] if !exists { log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) @@ -122,7 +123,7 @@ "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + "INNER JOIN APP AS a ON a.id = mp.app_id " + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " + - "where mp.tenant_id = " + tenantId + " and mp.appcred_id = " + apiKey + ";" + "where mp.tenant_id = \"" + tenantId + "\" and mp.appcred_id = \"" + apiKey + "\";" error := db.QueryRow(sSql).Scan(&apiProduct, &developerApp, &developer, &developerEmail) switch { @@ -139,8 +140,11 @@ dev := getValuesIgnoringNull(developer) devEmail := getValuesIgnoringNull(developerEmail) - return developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev} + return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} } + // TODO: local testing + // return developerInfo{ApiProduct: "testproduct", DeveloperApp: "testapp", DeveloperEmail: "testdeveloper@test.com", Developer: "testdeveloper"} + } func getValuesIgnoringNull(sqlValue sql.NullString) string {
diff --git a/common_helper_test.go b/common_helper_test.go index 46f1b6c..126602d 100644 --- a/common_helper_test.go +++ b/common_helper_test.go
@@ -11,16 +11,16 @@ It("should return testorg and testenv", func() { tenant, dbError := getTenantForScope("testid") Expect(dbError.Reason).To(Equal("")) - Expect(tenant.org).To(Equal("testorg")) - Expect(tenant.env).To(Equal("testenv")) - Expect(tenant.tenantId).To(Equal("tenantid")) + Expect(tenant.Org).To(Equal("testorg")) + Expect(tenant.Env).To(Equal("testenv")) + Expect(tenant.TenantId).To(Equal("tenantid")) }) }) Context("get tenant for invalid scopeuuid", func() { It("should return empty tenant and a db error", func() { tenant, dbError := getTenantForScope("wrongid") - Expect(tenant.org).To(Equal("")) + Expect(tenant.Org).To(Equal("")) Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) }) }) @@ -30,21 +30,20 @@ Context("get developerInfo for valid tenantId and apikey", func() { It("should return all right data", func() { developerInfo := getDeveloperInfo("tenantid","testapikey") - Expect(developerInfo.apiProduct).To(Equal("testproduct")) - Expect(developerInfo.developer).To(Equal("testdeveloper")) - Expect(developerInfo.developerEmail).To(Equal("testdeveloper@test.com")) - Expect(developerInfo.developerApp).To(Equal("testapp")) + Expect(developerInfo.ApiProduct).To(Equal("testproduct")) + Expect(developerInfo.Developer).To(Equal("testdeveloper")) + Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com")) + Expect(developerInfo.DeveloperApp).To(Equal("testapp")) }) }) Context("get developerInfo for invalid tenantId and apikey", func() { It("should return all right data", func() { developerInfo := getDeveloperInfo("wrongid","wrongapikey") - Expect(developerInfo.apiProduct).To(Equal("")) - Expect(developerInfo.developer).To(Equal("")) - Expect(developerInfo.developerEmail).To(Equal("")) - Expect(developerInfo.developerApp).To(Equal("")) - + Expect(developerInfo.ApiProduct).To(Equal("")) + Expect(developerInfo.Developer).To(Equal("")) + Expect(developerInfo.DeveloperEmail).To(Equal("")) + Expect(developerInfo.DeveloperApp).To(Equal("")) }) }) }) \ No newline at end of file
diff --git a/listener.go b/listener.go index ea9278a..d7a8509 100644 --- a/listener.go +++ b/listener.go
@@ -74,7 +74,7 @@ ele.Get("scope", &tenantid) ele.Get("org", &org) ele.Get("env", &env) - tenantCache[scopeuuid] = tenant{org: org, env: env, tenantId: tenantid} + tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid} } case common.Delete: rows = append(rows, payload.NewRow)
diff --git a/uploader.go b/uploader.go index 92039e8..78c68c2 100644 --- a/uploader.go +++ b/uploader.go
@@ -7,7 +7,6 @@ "path/filepath" "io/ioutil" "net/http" - "compress/gzip" "fmt" "time" ) @@ -20,6 +19,10 @@ var token string +var client *http.Client = &http.Client{ + Timeout: time.Duration(60 * time.Second), // default timeout of 60 seconds while connecting to s3/GCS + } + func addHeaders(req *http.Request) { req.Header.Add("Authorization", "Bearer " + token) } @@ -56,13 +59,11 @@ return false, err } else { log.Debugf("signed URL : %s", signedUrl) - return true, nil - //return uploadFileToDatastore(completeFilePath, signedUrl) + return uploadFileToDatastore(completeFilePath, signedUrl) } } func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) { - client := &http.Client{} //uapCollectionUrl := config.GetString(uapServerBase) + "/analytics" // localTesting @@ -105,36 +106,36 @@ func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) { // read gzip file that needs to be uploaded - f, err := os.Open(completeFilePath) + file, err := os.Open(completeFilePath) if err != nil { return false, err } - defer f.Close() - reader, err := gzip.NewReader(f) - if err != nil { - return false, fmt.Errorf("Cannot create reader on gzip file %v", err) - } + defer file.Close() - client := &http.Client{} - req, err := http.NewRequest("PUT", signedUrl, reader) + req, err := http.NewRequest("PUT", signedUrl, file) if err != nil { - return false, fmt.Errorf("Parsing URL failed due to %s",err.Error()) + return false, fmt.Errorf("Parsing URL failed due to %v", err) } req.Header.Set("Expect", "100-continue") req.Header.Set("Content-Type", "application/x-gzip") + fileStats, err := file.Stat() + if err != nil { + return false, fmt.Errorf("Could not get content length for file: %v", err) + } + req.ContentLength = fileStats.Size() + resp, err := client.Do(req) if err != nil { return false, err } defer resp.Body.Close() + if(resp.StatusCode == 200) { - // TODO: Remove - log.Debugf("response: %v", resp) return true, nil } else { - return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %s ", resp.Status) + return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %v ", resp.Status) } }