[XAPID-377] Refactored code and implemented working upload to S3 functionality
diff --git a/api.go b/api.go
index a34018d..a35bea5 100644
--- a/api.go
+++ b/api.go
@@ -19,9 +19,9 @@
}
type tenant struct {
- org string
- env string
- tenantId string
+ Org string
+ Env string
+ TenantId string
}
func initAPI(services apid.Services) {
@@ -40,7 +40,7 @@
return
}
- db = getDB() // When snapshot isnt processed
+ db = getDB() // When snapshot isnt processed
if db == nil {
writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely")
return
diff --git a/api_helper.go b/api_helper.go
index ccda6d5..e89d1d9 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -11,10 +11,10 @@
type developerInfo struct {
- apiProduct string
- developerApp string
- developerEmail string
- developer string
+ ApiProduct string
+ DeveloperApp string
+ DeveloperEmail string
+ Developer string
}
func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
@@ -48,7 +48,10 @@
func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse {
var raw map[string]interface{}
- json.Unmarshal(body, &raw)
+ err := json.Unmarshal(body, &raw)
+ if err != nil {
+ return errResponse{"BAD_DATA", "Not a valid JSON payload"}
+ }
if records := raw["records"]; records != nil {
for _, eachRecord := range records.([]interface{}) {
recordMap := eachRecord.(map[string]interface{})
@@ -56,7 +59,7 @@
if valid {
enrich(recordMap, scopeuuid, tenant)
// TODO: Remove log
- log.Debugf("Raw records : %v ", eachRecord)
+ log.Debugf("Raw records : %v ", recordMap)
} else {
return err // Even if there is one bad record, then reject entire batch
}
@@ -81,44 +84,50 @@
if exists1 && exists2 {
if crst.(int64) > cret.(int64) {
return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"}
-
}
}
return true, errResponse{}
}
func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
- if recordMap["organization"] == "" {
- recordMap["organization"] = tenant.org
+ org, orgExists := recordMap["organization"]
+ if !orgExists || org.(string) == "" {
+ recordMap["organization"] = tenant.Org
+ }
+ env, envExists := recordMap["environment"]
+ if !envExists || env.(string) == "" {
+ recordMap["environment"] = tenant.Env
}
- if recordMap["environment"] == "" {
- recordMap["environment"] = tenant.env
- }
+
apiKey, exists := recordMap["client_id"]
// apiKey doesnt exist then ignore adding developer fields
if exists {
apiKey := apiKey.(string)
- devInfo := getDeveloperInfo(tenant.tenantId, apiKey)
+ devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
// TODO: Remove log
- log.Debugf("developerInfo = %v", devInfo)
- if recordMap["api_product"] == "" {
- recordMap["api_product"] = devInfo.apiProduct
+ _, exists := recordMap["api_product"]
+ if !exists {
+ recordMap["api_product"] = devInfo.ApiProduct
}
- if recordMap["developer_app"] == "" {
- recordMap["developer_app"] = devInfo.developerApp
+ _, exists = recordMap["developer_app"]
+ if !exists {
+ recordMap["developer_app"] = devInfo.DeveloperApp
}
- if recordMap["developer_email"] == "" {
- recordMap["developer_email"] = devInfo.developerEmail
+ _, exists = recordMap["developer_email"]
+ if !exists {
+ recordMap["developer_email"] = devInfo.DeveloperEmail
}
- if recordMap["developer"] == "" {
- recordMap["developer"] = devInfo.developer
+ _, exists = recordMap["developer"]
+ if !exists {
+ recordMap["developer"] = devInfo.Developer
}
}
}
func publishToChannel(records []interface{}) {
// TODO: add the batch of records to a channel for consumption
+ log.Debugf("records on channel : %v", records)
return
}
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index da8d20d..347d694 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -34,6 +34,7 @@
config.Set("data_path", testTempDir)
config.Set(uapServerBase, "http://localhost:9000") // dummy value
+ config.Set(useCaching, false)
db, err := apid.Data().DB()
Expect(err).NotTo(HaveOccurred())
diff --git a/common_helper.go b/common_helper.go
index dd2c629..e64252a 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -24,7 +24,7 @@
defer rows.Close()
for rows.Next() {
rows.Scan(&env, &org, &tenantId, &id);
- tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId}
+ tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId}
}
}
log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache))
@@ -59,7 +59,7 @@
dev := getValuesIgnoringNull(developer)
devEmail := getValuesIgnoringNull(developerEmail)
- developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev}
+ developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
}
}
log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache))
@@ -67,7 +67,6 @@
}
func getTenantForScope(scopeuuid string) (tenant, dbError) {
-
if (config.GetBool(useCaching)) {
_, exists := tenantCache[scopeuuid]
if !exists {
@@ -99,13 +98,15 @@
return tenant{}, dbError{errorCode, reason}
}
- return tenant{org: org, env:env, tenantId: tenantId}, dbError{}
+ return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{}
}
+ // TODO: local testing
+ //return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{}
}
func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
if (config.GetBool(useCaching)) {
- keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
+ keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
_, exists := developerInfoCache[keyForMap]
if !exists {
log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
@@ -122,7 +123,7 @@
"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
"INNER JOIN APP AS a ON a.id = mp.app_id " +
"INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
- "where mp.tenant_id = " + tenantId + " and mp.appcred_id = " + apiKey + ";"
+ "where mp.tenant_id = \"" + tenantId + "\" and mp.appcred_id = \"" + apiKey + "\";"
error := db.QueryRow(sSql).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
switch {
@@ -139,8 +140,11 @@
dev := getValuesIgnoringNull(developer)
devEmail := getValuesIgnoringNull(developerEmail)
- return developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev}
+ return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
}
+ // TODO: local testing
+ // return developerInfo{ApiProduct: "testproduct", DeveloperApp: "testapp", DeveloperEmail: "testdeveloper@test.com", Developer: "testdeveloper"}
+
}
func getValuesIgnoringNull(sqlValue sql.NullString) string {
diff --git a/common_helper_test.go b/common_helper_test.go
index 46f1b6c..126602d 100644
--- a/common_helper_test.go
+++ b/common_helper_test.go
@@ -11,16 +11,16 @@
It("should return testorg and testenv", func() {
tenant, dbError := getTenantForScope("testid")
Expect(dbError.Reason).To(Equal(""))
- Expect(tenant.org).To(Equal("testorg"))
- Expect(tenant.env).To(Equal("testenv"))
- Expect(tenant.tenantId).To(Equal("tenantid"))
+ Expect(tenant.Org).To(Equal("testorg"))
+ Expect(tenant.Env).To(Equal("testenv"))
+ Expect(tenant.TenantId).To(Equal("tenantid"))
})
})
Context("get tenant for invalid scopeuuid", func() {
It("should return empty tenant and a db error", func() {
tenant, dbError := getTenantForScope("wrongid")
- Expect(tenant.org).To(Equal(""))
+ Expect(tenant.Org).To(Equal(""))
Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE"))
})
})
@@ -30,21 +30,20 @@
Context("get developerInfo for valid tenantId and apikey", func() {
It("should return all right data", func() {
developerInfo := getDeveloperInfo("tenantid","testapikey")
- Expect(developerInfo.apiProduct).To(Equal("testproduct"))
- Expect(developerInfo.developer).To(Equal("testdeveloper"))
- Expect(developerInfo.developerEmail).To(Equal("testdeveloper@test.com"))
- Expect(developerInfo.developerApp).To(Equal("testapp"))
+ Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
+ Expect(developerInfo.Developer).To(Equal("testdeveloper"))
+ Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com"))
+ Expect(developerInfo.DeveloperApp).To(Equal("testapp"))
})
})
Context("get developerInfo for invalid tenantId and apikey", func() {
It("should return all right data", func() {
developerInfo := getDeveloperInfo("wrongid","wrongapikey")
- Expect(developerInfo.apiProduct).To(Equal(""))
- Expect(developerInfo.developer).To(Equal(""))
- Expect(developerInfo.developerEmail).To(Equal(""))
- Expect(developerInfo.developerApp).To(Equal(""))
-
+ Expect(developerInfo.ApiProduct).To(Equal(""))
+ Expect(developerInfo.Developer).To(Equal(""))
+ Expect(developerInfo.DeveloperEmail).To(Equal(""))
+ Expect(developerInfo.DeveloperApp).To(Equal(""))
})
})
})
\ No newline at end of file
diff --git a/listener.go b/listener.go
index ea9278a..d7a8509 100644
--- a/listener.go
+++ b/listener.go
@@ -74,7 +74,7 @@
ele.Get("scope", &tenantid)
ele.Get("org", &org)
ele.Get("env", &env)
- tenantCache[scopeuuid] = tenant{org: org, env: env, tenantId: tenantid}
+ tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid}
}
case common.Delete:
rows = append(rows, payload.NewRow)
diff --git a/uploader.go b/uploader.go
index 92039e8..78c68c2 100644
--- a/uploader.go
+++ b/uploader.go
@@ -7,7 +7,6 @@
"path/filepath"
"io/ioutil"
"net/http"
- "compress/gzip"
"fmt"
"time"
)
@@ -20,6 +19,10 @@
var token string
+var client *http.Client = &http.Client{
+ Timeout: time.Duration(60 * time.Second), // default timeout of 60 seconds while connecting to s3/GCS
+ }
+
func addHeaders(req *http.Request) {
req.Header.Add("Authorization", "Bearer " + token)
}
@@ -56,13 +59,11 @@
return false, err
} else {
log.Debugf("signed URL : %s", signedUrl)
- return true, nil
- //return uploadFileToDatastore(completeFilePath, signedUrl)
+ return uploadFileToDatastore(completeFilePath, signedUrl)
}
}
func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
- client := &http.Client{}
//uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
// localTesting
@@ -105,36 +106,36 @@
func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
// read gzip file that needs to be uploaded
- f, err := os.Open(completeFilePath)
+ file, err := os.Open(completeFilePath)
if err != nil {
return false, err
}
- defer f.Close()
- reader, err := gzip.NewReader(f)
- if err != nil {
- return false, fmt.Errorf("Cannot create reader on gzip file %v", err)
- }
+ defer file.Close()
- client := &http.Client{}
- req, err := http.NewRequest("PUT", signedUrl, reader)
+ req, err := http.NewRequest("PUT", signedUrl, file)
if err != nil {
- return false, fmt.Errorf("Parsing URL failed due to %s",err.Error())
+ return false, fmt.Errorf("Parsing URL failed due to %v", err)
}
req.Header.Set("Expect", "100-continue")
req.Header.Set("Content-Type", "application/x-gzip")
+ fileStats, err := file.Stat()
+ if err != nil {
+ return false, fmt.Errorf("Could not get content length for file: %v", err)
+ }
+ req.ContentLength = fileStats.Size()
+
resp, err := client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
+
if(resp.StatusCode == 200) {
- // TODO: Remove
- log.Debugf("response: %v", resp)
return true, nil
} else {
- return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %s ", resp.Status)
+ return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %v ", resp.Status)
}
}