[XAPID-377] Added GetSigned URL from UAP and upload to datastore API calls
diff --git a/common_helper.go b/common_helper.go index 6d320dc..0eea2c1 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -27,7 +27,7 @@ tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId} } } - log.Debugf("Found scopes : %d", len(tenantCache)) + log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache)) return nil } @@ -62,6 +62,7 @@ developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev} } } + log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache)) return nil }
diff --git a/init.go b/init.go index 5b40590..08880fa 100644 --- a/init.go +++ b/init.go
@@ -31,6 +31,7 @@ useCachingDefault = true maxRetries = 3 + retryFailedDirBatchSize = 10 ) // keep track of the services that this plugin will use @@ -48,7 +49,6 @@ localAnalyticsStagingDir string localAnalyticsFailedDir string localAnalyticsRecoveredDir string - uapEndpoint string ) // apid.RegisterPlugin() is required to be called in init() @@ -82,12 +82,13 @@ return pluginData, fmt.Errorf("Missing required config value: %s: ", err) } + // TOOO: remove later + //config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex") for _, key := range []string{uapServerBase} { if !config.IsSet(key) { return pluginData, fmt.Errorf("Missing required config value: %s", key) } } - uapEndpoint = uapServerBase + "/analytics" directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} err = createDirectories(directories) @@ -103,22 +104,6 @@ // TODO: perform crash recovery initUploadManager() - if (config.GetBool(useCaching)) { - err = createTenantCache() - if err != nil { - return pluginData, fmt.Errorf("Could not create tenant cache %s: ", err) - } - log.Debug("Created a local cache for datasope information") - - err = createDeveloperInfoCache() - if err != nil { - return pluginData, fmt.Errorf("Could not creata developer info cache %s: ", err) - } - log.Debug("Created a local cache for developer and app information") - } else { - log.Debug("Will not be caching any info and make a DB call for every analytics msg") - } - initAPI(services) log.Debug("end init for apidAnalytics plugin")
diff --git a/listener.go b/listener.go index 251fe5e..46c67c6 100644 --- a/listener.go +++ b/listener.go
@@ -35,6 +35,27 @@ log.Panicf("Unable to access database: %v", err) } setDB(db) + + // After first snapshot is received, create local cache + if (config.GetBool(useCaching)) { + err = createTenantCache() + if err != nil { + log.Debugf("Could not create a local cache for datascope info: %s", err.Error()) + } else { + log.Debug("Created a local cache for datasope information") + } + err = createDeveloperInfoCache() + if err != nil { + log.Debugf("Could not create a local cache for developer and app info: %s", err.Error()) + + } else { + log.Debug("Created a local cache for developer and app information") + } + log.Debug("Created a local cache for developer and app information") + } else { + log.Debug("Will not be caching any info and make a DB call for every analytics msg") + } + return }
diff --git a/uploadManager.go b/uploadManager.go index f6f0f87..93aa3bb 100644 --- a/uploadManager.go +++ b/uploadManager.go
@@ -30,33 +30,67 @@ log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir) } + uploadedDirCnt := 0 for _, file := range files { - log.Debugf("t: %s , file: %s", t, file.Name()) + log.Debugf("t: %s , directory: %s", t, file.Name()) if file.IsDir() { - handleUploadDirStatus(file, uploadDir(file)) + status := uploadDir(file) + handleUploadDirStatus(file, status) + if status { + uploadedDirCnt++ + } } } + if uploadedDirCnt > 0 { + // After a successful upload, retry the folders in failed directory as they might have + // failed due to intermitent S3/GCS issue + retryFailedUploads() + } } }() } -func handleUploadDirStatus(file os.FileInfo, status bool) { - completePath := filepath.Join(localAnalyticsStagingDir, file.Name()) +func handleUploadDirStatus(dir os.FileInfo, status bool) { + completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) if status { os.RemoveAll(completePath) + log.Debugf("deleted directory after successful upload : %s", dir.Name()) // remove key if exists from retry map after a successful upload - delete(retriesMap, file.Name()) + delete(retriesMap, dir.Name()) } else { - retriesMap[file.Name()] = retriesMap[file.Name()] + 1 - if retriesMap[file.Name()] > maxRetries { + retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1 + if retriesMap[dir.Name()] > maxRetries { log.Errorf("Max Retires exceeded for folder: %s", completePath) - failedDirPath := filepath.Join(localAnalyticsFailedDir, file.Name()) + failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name()) err := os.Rename(completePath, failedDirPath) if err != nil { - log.Errorf("Cannot move directory :%s to failed folder", file.Name()) + log.Errorf("Cannot move directory :%s to failed folder", dir.Name()) } // remove key from retry map once it reaches allowed max failed attempts - delete(retriesMap, file.Name()) + delete(retriesMap, dir.Name()) + } + } +} + +func retryFailedUploads() { + failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir) + + if err != nil { + log.Errorf("Cannot read directory %s: ", localAnalyticsFailedDir) + } + + cnt := 0 + for _, dir := range failedDirs { + // We rety failed folder in batches to not overload the upload thread + if cnt < retryFailedDirBatchSize { + failedPath := filepath.Join(localAnalyticsFailedDir, dir.Name()) + newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name()) + err := os.Rename(failedPath, newStagingPath) + if err != nil { + log.Errorf("Cannot move directory :%s to staging folder", dir.Name()) + } + } else { + break } } } \ No newline at end of file
diff --git a/uploader.go b/uploader.go index 308e752..3941b35 100644 --- a/uploader.go +++ b/uploader.go
@@ -1,30 +1,128 @@ package apidAnalytics import ( - _ "fmt" "os" + "encoding/json" "strings" "path/filepath" + "io/ioutil" + "net/http" + "errors" + "compress/gzip" ) +//var token string + func uploadDir(dir os.FileInfo) bool { - // TODO: handle upload to UAP file by file - completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) - log.Debug("Complete Path : %s", completePath) + tenant, timestamp := splitDirName(dir.Name()) - date := getDateFromDirTimestamp(timestamp) - log.Debug("tenant: %s | timestamp %s", tenant, date) - //for _, file := range dir { - // //log.Debugf("t: %s , file: %s", t, file.Name()) - // if file.IsDir() { - // handleUploadDirStatus(file, uploadDir(file)) - // } - //} - return false + dateTimePartition := getDateFromDirTimestamp(timestamp) + log.Debugf("tenant: %s | timestamp %s", tenant, timestamp) + + completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) + files, _ := ioutil.ReadDir(completePath) + + var status bool + var error error + for _, file := range files { + completeFilePath := filepath.Join(completePath, file.Name()) + relativeFilePath := dateTimePartition + "/" + file.Name(); + status, error = uploadFile(tenant,relativeFilePath, completeFilePath) + if error != nil { + log.Errorf("Upload File failed due to : %s", error.Error()) + break + } else { + os.Remove(completeFilePath) + log.Debugf("deleted file after successful upload : %s", file.Name()) + } + } + return status +} + +func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) { + + signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath) + if (err != nil) { + return false, err + } else { + log.Debugf("signed URL : %s", signedUrl) + return true, nil + //return uploadToDatastore(completeFilePath, signedUrl) + } +} + +func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) { + client := &http.Client{} + uapCollectionUrl := config.GetString(uapServerBase) + "/analytics" + + req, err := http.NewRequest("GET", uapCollectionUrl, nil) + if err != nil { + return "", err + } + + q := req.URL.Query() + q.Add("tenant", tenant) + q.Add("relativeFilePath", relativeFilePath) + q.Add("contentType", "application/x-gzip") + req.URL.RawQuery = q.Encode() + + // TODO: get bearer token and add as header + //addHeaders(req) + resp, err := client.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + respBody, _ := ioutil.ReadAll(resp.Body) + if(resp.StatusCode == 200) { + var body map[string]interface{} + json.Unmarshal(respBody, &body) + signedURL := body["url"] + return signedURL.(string), nil + } else { + return "", errors.New("Getting signed URL failed due to " + resp.Status) + } +} +//func addHeaders(req *http.Request) { +// req.Header.Add("Authorization", "Bearer " + token) +//} + +func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) { + // read gzip file that needs to be uploaded + f, err := os.Open(completeFilePath) + if err != nil { + return false, err + } + defer f.Close() + reader, err := gzip.NewReader(f) + if err != nil { + return false, err + } + + client := &http.Client{} + req, err := http.NewRequest("PUT", signedUrl, reader) + if err != nil { + return false, err + } + + req.Header.Set("Expect", "100-continue") + req.Header.Set("Content-Type", "application/x-gzip") + + resp, err := client.Do(req) + if err != nil { + return false, err + } + defer resp.Body.Close() + if(resp.StatusCode == 200) { + log.Debugf("response: %v", resp) + return true, nil + } else { + return false, errors.New("Failed to upload file to datastore " + resp.Status) + } } func splitDirName(dirName string) (string, string){ - s := strings.Split("dirName", "~") + s := strings.Split(dirName, "~") tenant := s[0]+"~"+s[1] timestamp := s[2] return tenant, timestamp