[XAPID-377] Added GetSigned URL from UAP and upload to datastore API calls
diff --git a/common_helper.go b/common_helper.go
index 6d320dc..0eea2c1 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -27,7 +27,7 @@
tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId}
}
}
- log.Debugf("Found scopes : %d", len(tenantCache))
+ log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache))
return nil
}
@@ -62,6 +62,7 @@
developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev}
}
}
+ log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache))
return nil
}
diff --git a/init.go b/init.go
index 5b40590..08880fa 100644
--- a/init.go
+++ b/init.go
@@ -31,6 +31,7 @@
useCachingDefault = true
maxRetries = 3
+ retryFailedDirBatchSize = 10
)
// keep track of the services that this plugin will use
@@ -48,7 +49,6 @@
localAnalyticsStagingDir string
localAnalyticsFailedDir string
localAnalyticsRecoveredDir string
- uapEndpoint string
)
// apid.RegisterPlugin() is required to be called in init()
@@ -82,12 +82,13 @@
return pluginData, fmt.Errorf("Missing required config value: %s: ", err)
}
+ // TOOO: remove later
+ //config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex")
for _, key := range []string{uapServerBase} {
if !config.IsSet(key) {
return pluginData, fmt.Errorf("Missing required config value: %s", key)
}
}
- uapEndpoint = uapServerBase + "/analytics"
directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
err = createDirectories(directories)
@@ -103,22 +104,6 @@
// TODO: perform crash recovery
initUploadManager()
- if (config.GetBool(useCaching)) {
- err = createTenantCache()
- if err != nil {
- return pluginData, fmt.Errorf("Could not create tenant cache %s: ", err)
- }
- log.Debug("Created a local cache for datasope information")
-
- err = createDeveloperInfoCache()
- if err != nil {
- return pluginData, fmt.Errorf("Could not creata developer info cache %s: ", err)
- }
- log.Debug("Created a local cache for developer and app information")
- } else {
- log.Debug("Will not be caching any info and make a DB call for every analytics msg")
- }
-
initAPI(services)
log.Debug("end init for apidAnalytics plugin")
diff --git a/listener.go b/listener.go
index 251fe5e..46c67c6 100644
--- a/listener.go
+++ b/listener.go
@@ -35,6 +35,27 @@
log.Panicf("Unable to access database: %v", err)
}
setDB(db)
+
+ // After first snapshot is received, create local cache
+ if (config.GetBool(useCaching)) {
+ err = createTenantCache()
+ if err != nil {
+ log.Debugf("Could not create a local cache for datascope info: %s", err.Error())
+ } else {
+ log.Debug("Created a local cache for datasope information")
+ }
+ err = createDeveloperInfoCache()
+ if err != nil {
+ log.Debugf("Could not create a local cache for developer and app info: %s", err.Error())
+
+ } else {
+ log.Debug("Created a local cache for developer and app information")
+ }
+ log.Debug("Created a local cache for developer and app information")
+ } else {
+ log.Debug("Will not be caching any info and make a DB call for every analytics msg")
+ }
+
return
}
diff --git a/uploadManager.go b/uploadManager.go
index f6f0f87..93aa3bb 100644
--- a/uploadManager.go
+++ b/uploadManager.go
@@ -30,33 +30,67 @@
log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir)
}
+ uploadedDirCnt := 0
for _, file := range files {
- log.Debugf("t: %s , file: %s", t, file.Name())
+ log.Debugf("t: %s , directory: %s", t, file.Name())
if file.IsDir() {
- handleUploadDirStatus(file, uploadDir(file))
+ status := uploadDir(file)
+ handleUploadDirStatus(file, status)
+ if status {
+ uploadedDirCnt++
+ }
}
}
+ if uploadedDirCnt > 0 {
+ // After a successful upload, retry the folders in failed directory as they might have
+ // failed due to intermitent S3/GCS issue
+ retryFailedUploads()
+ }
}
}()
}
-func handleUploadDirStatus(file os.FileInfo, status bool) {
- completePath := filepath.Join(localAnalyticsStagingDir, file.Name())
+func handleUploadDirStatus(dir os.FileInfo, status bool) {
+ completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
if status {
os.RemoveAll(completePath)
+ log.Debugf("deleted directory after successful upload : %s", dir.Name())
// remove key if exists from retry map after a successful upload
- delete(retriesMap, file.Name())
+ delete(retriesMap, dir.Name())
} else {
- retriesMap[file.Name()] = retriesMap[file.Name()] + 1
- if retriesMap[file.Name()] > maxRetries {
+ retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
+ if retriesMap[dir.Name()] > maxRetries {
log.Errorf("Max Retires exceeded for folder: %s", completePath)
- failedDirPath := filepath.Join(localAnalyticsFailedDir, file.Name())
+ failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
err := os.Rename(completePath, failedDirPath)
if err != nil {
- log.Errorf("Cannot move directory :%s to failed folder", file.Name())
+ log.Errorf("Cannot move directory :%s to failed folder", dir.Name())
}
// remove key from retry map once it reaches allowed max failed attempts
- delete(retriesMap, file.Name())
+ delete(retriesMap, dir.Name())
+ }
+ }
+}
+
+func retryFailedUploads() {
+ failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir)
+
+ if err != nil {
+ log.Errorf("Cannot read directory %s: ", localAnalyticsFailedDir)
+ }
+
+ cnt := 0
+ for _, dir := range failedDirs {
+ // We rety failed folder in batches to not overload the upload thread
+ if cnt < retryFailedDirBatchSize {
+ failedPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+ newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ err := os.Rename(failedPath, newStagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory :%s to staging folder", dir.Name())
+ }
+ } else {
+ break
}
}
}
\ No newline at end of file
diff --git a/uploader.go b/uploader.go
index 308e752..3941b35 100644
--- a/uploader.go
+++ b/uploader.go
@@ -1,30 +1,128 @@
package apidAnalytics
import (
- _ "fmt"
"os"
+ "encoding/json"
"strings"
"path/filepath"
+ "io/ioutil"
+ "net/http"
+ "errors"
+ "compress/gzip"
)
+//var token string
+
func uploadDir(dir os.FileInfo) bool {
- // TODO: handle upload to UAP file by file
- completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
- log.Debug("Complete Path : %s", completePath)
+
tenant, timestamp := splitDirName(dir.Name())
- date := getDateFromDirTimestamp(timestamp)
- log.Debug("tenant: %s | timestamp %s", tenant, date)
- //for _, file := range dir {
- // //log.Debugf("t: %s , file: %s", t, file.Name())
- // if file.IsDir() {
- // handleUploadDirStatus(file, uploadDir(file))
- // }
- //}
- return false
+ dateTimePartition := getDateFromDirTimestamp(timestamp)
+ log.Debugf("tenant: %s | timestamp %s", tenant, timestamp)
+
+ completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ files, _ := ioutil.ReadDir(completePath)
+
+ var status bool
+ var error error
+ for _, file := range files {
+ completeFilePath := filepath.Join(completePath, file.Name())
+ relativeFilePath := dateTimePartition + "/" + file.Name();
+ status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
+ if error != nil {
+ log.Errorf("Upload File failed due to : %s", error.Error())
+ break
+ } else {
+ os.Remove(completeFilePath)
+ log.Debugf("deleted file after successful upload : %s", file.Name())
+ }
+ }
+ return status
+}
+
+func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
+
+ signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath)
+ if (err != nil) {
+ return false, err
+ } else {
+ log.Debugf("signed URL : %s", signedUrl)
+ return true, nil
+ //return uploadToDatastore(completeFilePath, signedUrl)
+ }
+}
+
+func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
+ client := &http.Client{}
+ uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+ req, err := http.NewRequest("GET", uapCollectionUrl, nil)
+ if err != nil {
+ return "", err
+ }
+
+ q := req.URL.Query()
+ q.Add("tenant", tenant)
+ q.Add("relativeFilePath", relativeFilePath)
+ q.Add("contentType", "application/x-gzip")
+ req.URL.RawQuery = q.Encode()
+
+ // TODO: get bearer token and add as header
+ //addHeaders(req)
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ defer resp.Body.Close()
+ respBody, _ := ioutil.ReadAll(resp.Body)
+ if(resp.StatusCode == 200) {
+ var body map[string]interface{}
+ json.Unmarshal(respBody, &body)
+ signedURL := body["url"]
+ return signedURL.(string), nil
+ } else {
+ return "", errors.New("Getting signed URL failed due to " + resp.Status)
+ }
+}
+//func addHeaders(req *http.Request) {
+// req.Header.Add("Authorization", "Bearer " + token)
+//}
+
+func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) {
+ // read gzip file that needs to be uploaded
+ f, err := os.Open(completeFilePath)
+ if err != nil {
+ return false, err
+ }
+ defer f.Close()
+ reader, err := gzip.NewReader(f)
+ if err != nil {
+ return false, err
+ }
+
+ client := &http.Client{}
+ req, err := http.NewRequest("PUT", signedUrl, reader)
+ if err != nil {
+ return false, err
+ }
+
+ req.Header.Set("Expect", "100-continue")
+ req.Header.Set("Content-Type", "application/x-gzip")
+
+ resp, err := client.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+ if(resp.StatusCode == 200) {
+ log.Debugf("response: %v", resp)
+ return true, nil
+ } else {
+ return false, errors.New("Failed to upload file to datastore " + resp.Status)
+ }
}
func splitDirName(dirName string) (string, string){
- s := strings.Split("dirName", "~")
+ s := strings.Split(dirName, "~")
tenant := s[0]+"~"+s[1]
timestamp := s[2]
return tenant, timestamp