[XAPID-377] Added GetSigned URL from UAP and upload to datastore API calls
diff --git a/common_helper.go b/common_helper.go
index 6d320dc..0eea2c1 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -27,7 +27,7 @@
 			tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId}
 		}
 	}
-	log.Debugf("Found scopes : %d", len(tenantCache))
+	log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache))
 	return nil
 }
 
@@ -62,6 +62,7 @@
 			developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev}
 		}
 	}
+	log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache))
 	return nil
 }
 
diff --git a/init.go b/init.go
index 5b40590..08880fa 100644
--- a/init.go
+++ b/init.go
@@ -31,6 +31,7 @@
 	useCachingDefault = true
 
 	maxRetries = 3
+	retryFailedDirBatchSize = 10
 )
 
 // keep track of the services that this plugin will use
@@ -48,7 +49,6 @@
 	localAnalyticsStagingDir   string
 	localAnalyticsFailedDir    string
 	localAnalyticsRecoveredDir string
-	uapEndpoint string
 )
 
 // apid.RegisterPlugin() is required to be called in init()
@@ -82,12 +82,13 @@
 		return pluginData, fmt.Errorf("Missing required config value:  %s: ", err)
 	}
 
+	// TOOO: remove later
+	//config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex")
 	for _, key := range []string{uapServerBase} {
 		if !config.IsSet(key) {
 			return pluginData, fmt.Errorf("Missing required config value: %s", key)
 		}
 	}
-	uapEndpoint = uapServerBase + "/analytics"
 
 	directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
 	err = createDirectories(directories)
@@ -103,22 +104,6 @@
 	// TODO: perform crash recovery
 	initUploadManager()
 
-	if (config.GetBool(useCaching)) {
-		err = createTenantCache()
-		if err != nil {
-			return pluginData, fmt.Errorf("Could not create tenant cache %s: ", err)
-		}
-		log.Debug("Created a local cache for datasope information")
-
-		err = createDeveloperInfoCache()
-		if err != nil {
-			return pluginData, fmt.Errorf("Could not creata developer info cache %s: ", err)
-		}
-		log.Debug("Created a local cache for developer and app information")
-	} else {
-		log.Debug("Will not be caching any info and make a DB call for every analytics msg")
-	}
-
 	initAPI(services)
 
 	log.Debug("end init for apidAnalytics plugin")
diff --git a/listener.go b/listener.go
index 251fe5e..46c67c6 100644
--- a/listener.go
+++ b/listener.go
@@ -35,6 +35,27 @@
 		log.Panicf("Unable to access database: %v", err)
 	}
 	setDB(db)
+
+	// After first snapshot is received, create local cache
+	if (config.GetBool(useCaching)) {
+		err = createTenantCache()
+		if err != nil {
+			log.Debugf("Could not create a local cache for datascope info: %s", err.Error())
+		} else {
+			log.Debug("Created a local cache for datasope information")
+		}
+		err = createDeveloperInfoCache()
+		if err != nil {
+			log.Debugf("Could not create a local cache for developer and app info: %s", err.Error())
+
+		} else {
+			log.Debug("Created a local cache for developer and app information")
+		}
+		log.Debug("Created a local cache for developer and app information")
+	} else {
+		log.Debug("Will not be caching any info and make a DB call for every analytics msg")
+	}
+
 	return
 }
 
diff --git a/uploadManager.go b/uploadManager.go
index f6f0f87..93aa3bb 100644
--- a/uploadManager.go
+++ b/uploadManager.go
@@ -30,33 +30,67 @@
 				log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir)
 			}
 
+			uploadedDirCnt := 0
 			for _, file := range files {
-				log.Debugf("t: %s , file: %s", t, file.Name())
+				log.Debugf("t: %s , directory: %s", t, file.Name())
 				if file.IsDir() {
-					handleUploadDirStatus(file, uploadDir(file))
+					status := uploadDir(file)
+					handleUploadDirStatus(file, status)
+					if status {
+						uploadedDirCnt++
+					}
 				}
 			}
+			if uploadedDirCnt > 0 {
+				// After a successful upload, retry the folders in failed directory as they might have
+				// failed due to intermitent S3/GCS issue
+				retryFailedUploads()
+			}
 		}
 	}()
 }
 
-func handleUploadDirStatus(file os.FileInfo, status bool) {
-	completePath := filepath.Join(localAnalyticsStagingDir, file.Name())
+func handleUploadDirStatus(dir os.FileInfo, status bool) {
+	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
 	if status {
 		os.RemoveAll(completePath)
+		log.Debugf("deleted directory after successful upload : %s", dir.Name())
 		// remove key if exists from retry map after a successful upload
-		delete(retriesMap, file.Name())
+		delete(retriesMap, dir.Name())
 	} else {
-		retriesMap[file.Name()] = retriesMap[file.Name()] + 1
-		if retriesMap[file.Name()] > maxRetries {
+		retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
+		if retriesMap[dir.Name()] > maxRetries {
 			log.Errorf("Max Retires exceeded for folder: %s", completePath)
-			failedDirPath := filepath.Join(localAnalyticsFailedDir, file.Name())
+			failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
 			err := os.Rename(completePath, failedDirPath)
 			if err != nil {
-				log.Errorf("Cannot move directory :%s to failed folder", file.Name())
+				log.Errorf("Cannot move directory :%s to failed folder", dir.Name())
 			}
 			// remove key from retry map once it reaches allowed max failed attempts
-			delete(retriesMap, file.Name())
+			delete(retriesMap, dir.Name())
+		}
+	}
+}
+
+func retryFailedUploads() {
+	failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir)
+
+	if err != nil {
+		log.Errorf("Cannot read directory %s: ", localAnalyticsFailedDir)
+	}
+
+	cnt := 0
+	for _, dir := range failedDirs {
+		// We rety failed folder in batches to not overload the upload thread
+		if cnt < retryFailedDirBatchSize {
+			failedPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+			newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+			err := os.Rename(failedPath, newStagingPath)
+			if err != nil {
+				log.Errorf("Cannot move directory :%s to staging folder", dir.Name())
+			}
+		} else {
+			break
 		}
 	}
 }
\ No newline at end of file
diff --git a/uploader.go b/uploader.go
index 308e752..3941b35 100644
--- a/uploader.go
+++ b/uploader.go
@@ -1,30 +1,128 @@
 package apidAnalytics
 
 import (
-	_ "fmt"
 	"os"
+	"encoding/json"
 	"strings"
 	"path/filepath"
+	"io/ioutil"
+	"net/http"
+	"errors"
+	"compress/gzip"
 )
 
+//var token string
+
 func uploadDir(dir os.FileInfo) bool {
-	// TODO: handle upload to UAP file by file
-	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
-	log.Debug("Complete Path : %s", completePath)
+
 	tenant, timestamp := splitDirName(dir.Name())
-	date := getDateFromDirTimestamp(timestamp)
-	log.Debug("tenant: %s | timestamp %s", tenant, date)
-	//for _, file := range dir {
-	//	//log.Debugf("t: %s , file: %s", t, file.Name())
-	//	if file.IsDir() {
-	//		handleUploadDirStatus(file, uploadDir(file))
-	//	}
-	//}
-	return false
+	dateTimePartition := getDateFromDirTimestamp(timestamp)
+	log.Debugf("tenant: %s | timestamp %s", tenant, timestamp)
+
+	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+	files, _ := ioutil.ReadDir(completePath)
+
+	var status bool
+	var error error
+	for _, file := range files {
+		completeFilePath := filepath.Join(completePath, file.Name())
+		relativeFilePath := dateTimePartition + "/" + file.Name();
+		status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
+		if error != nil {
+			log.Errorf("Upload File failed due to : %s", error.Error())
+			break
+		} else {
+			os.Remove(completeFilePath)
+			log.Debugf("deleted file after successful upload : %s", file.Name())
+		}
+	}
+	return status
+}
+
+func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
+
+	signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath)
+	if (err != nil) {
+		return false, err
+	} else {
+		log.Debugf("signed URL : %s", signedUrl)
+		return true, nil
+		//return uploadToDatastore(completeFilePath, signedUrl)
+	}
+}
+
+func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
+	client := &http.Client{}
+	uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+	req, err := http.NewRequest("GET", uapCollectionUrl, nil)
+	if err != nil {
+		return "", err
+	}
+
+	q := req.URL.Query()
+	q.Add("tenant", tenant)
+	q.Add("relativeFilePath", relativeFilePath)
+	q.Add("contentType", "application/x-gzip")
+	req.URL.RawQuery = q.Encode()
+
+	// TODO: get bearer token and add as header
+	//addHeaders(req)
+	resp, err := client.Do(req)
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+	respBody, _ := ioutil.ReadAll(resp.Body)
+	if(resp.StatusCode == 200) {
+		var body map[string]interface{}
+		json.Unmarshal(respBody, &body)
+		signedURL :=  body["url"]
+		return signedURL.(string), nil
+	} else {
+		return "", errors.New("Getting signed URL failed due to " + resp.Status)
+	}
+}
+//func addHeaders(req *http.Request) {
+//	req.Header.Add("Authorization", "Bearer " + token)
+//}
+
+func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) {
+	// read gzip file that needs to be uploaded
+	f, err := os.Open(completeFilePath)
+	if err != nil {
+		return false, err
+	}
+	defer f.Close()
+	reader, err := gzip.NewReader(f)
+	if err != nil {
+		return false, err
+	}
+
+	client := &http.Client{}
+	req, err := http.NewRequest("PUT", signedUrl, reader)
+	if err != nil {
+		return false, err
+	}
+
+	req.Header.Set("Expect", "100-continue")
+	req.Header.Set("Content-Type", "application/x-gzip")
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return false, err
+	}
+	defer resp.Body.Close()
+	if(resp.StatusCode == 200) {
+		log.Debugf("response: %v", resp)
+		return true, nil
+	} else {
+		return false, errors.New("Failed to upload file to datastore " + resp.Status)
+	}
 }
 
 func splitDirName(dirName string) (string, string){
-	s := strings.Split("dirName", "~")
+	s := strings.Split(dirName, "~")
 	tenant := s[0]+"~"+s[1]
 	timestamp := s[2]
 	return  tenant, timestamp