[XAPID-377] Added CrashRecovery logic to handle unclosed files incase of plugin crash
diff --git a/api_helper.go b/api_helper.go
index 7ae0b8f..ccda6d5 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -55,6 +55,7 @@
 			valid, err := validate(recordMap)
 			if valid {
 				enrich(recordMap, scopeuuid, tenant)
+				// TODO: Remove log
 				log.Debugf("Raw records : %v ", eachRecord)
 			} else {
 				return err				// Even if there is one bad record, then reject entire batch
@@ -99,6 +100,7 @@
 	if exists {
 		apiKey := apiKey.(string)
 		devInfo := getDeveloperInfo(tenant.tenantId, apiKey)
+		// TODO: Remove log
 		log.Debugf("developerInfo = %v",  devInfo)
 		if recordMap["api_product"] == "" {
 			recordMap["api_product"] = devInfo.apiProduct
diff --git a/common_helper.go b/common_helper.go
index 0eea2c1..dd2c629 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -27,7 +27,7 @@
 			tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId}
 		}
 	}
-	log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache))
+	log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache))
 	return nil
 }
 
diff --git a/crashRecovery.go b/crashRecovery.go
new file mode 100644
index 0000000..33a0711
--- /dev/null
+++ b/crashRecovery.go
@@ -0,0 +1,165 @@
+package apidAnalytics
+
+import (
+	"time"
+	"io/ioutil"
+	"path/filepath"
+	"bufio"
+	"os"
+	"strings"
+	"compress/gzip"
+)
+
+const crashRecoveryDelay = 30  // seconds
+const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format
+const fileExtension = ".txt.gz";
+
+const recoveredTS  = "~recoveredTS~"
+
+
+func initCrashRecovery() {
+	if crashRecoveryNeeded() {
+		timer := time.NewTimer(time.Second * crashRecoveryDelay)
+		go func() {
+			<- timer.C
+			performRecovery()
+		}()
+	}
+}
+
+func crashRecoveryNeeded() (bool) {
+	recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
+	tmpDirRecoveryNeeded :=  recoverFoldersInTmpDir()
+	needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
+	if needed {
+		log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay)
+	}
+	return needed
+}
+
+func recoverFoldersInTmpDir() bool {
+	tmpRecoveryNeeded := false
+	dirs,_ := ioutil.ReadDir(localAnalyticsTempDir)
+	recoveryTS := getRecoveryTS()
+	for _, dir := range dirs {
+		tmpRecoveryNeeded = true
+		log.Debugf("Moving directory %s from tmp to recovered ", dir.Name())
+		tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
+
+		newDirName :=  dir.Name() + recoveredTS + recoveryTS;
+		recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName)
+		err := os.Rename(tmpCompletePath, recoveredCompletePath)
+		if err != nil {
+			log.Errorf("Cannot move directory :%s to recovered folder", dir.Name())
+		}
+	}
+	return tmpRecoveryNeeded
+}
+
+func getRecoveryTS() string {
+	current := time.Now()
+	return current.Format(recovertTSLayout)
+}
+
+func recoverFolderInRecoveredDir() bool {
+	dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+	if len(dirs) > 0 {
+		return true
+	}
+	return false
+}
+
+func performRecovery()  {
+	log.Info("Crash recovery is starting...");
+	recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+	for _, dir := range recoveryDirs {
+		recoverDirectory(dir.Name());
+	}
+	log.Info("Crash recovery complete...");
+}
+
+func recoverDirectory(dirName string) {
+	log.Infof("performing crash recovery for directory: %s", dirName);
+	var bucketRecoveryTS string
+
+	// Parse bucket name to extract recoveryTS and pass it each file to be recovered
+	index := strings.Index(dirName, recoveredTS)
+	if index != -1 {
+		bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
+	}
+
+	dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
+	files, _ := ioutil.ReadDir(dirBeingRecovered)
+	for _, file := range files {
+		// recovering each file sequentially for now
+		recoverFile(bucketRecoveryTS, dirName, file.Name());
+	}
+
+	stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+	err := os.Rename(dirBeingRecovered, stagingPath)
+	if err != nil {
+		log.Errorf("Cannot move directory :%s to staging folder", dirName)
+	}
+}
+
+func recoverFile(bucketRecoveryTS, dirName, fileName string) {
+	log.Debugf("performing crash recovery for file: %s ", fileName)
+	// add recovery timestamp to the file name
+	completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
+	recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
+	recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
+	recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
+	copyPartialFile(completeOrigFilePath, recoveredFilePath);
+	deletePartialFile(completeOrigFilePath);
+}
+
+func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
+
+	// read partial file line by line using buffered gzip reader
+	partialFile, err := os.Open(completeOrigFilePath)
+	if err != nil {
+		log.Errorf("Cannot open file: %s", completeOrigFilePath)
+		return
+	}
+	defer partialFile.Close()
+
+	bufReader := bufio.NewReader(partialFile)
+	gzReader, err := gzip.NewReader(bufReader)
+	if err != nil {
+		log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err)
+		return
+	}
+	defer gzReader.Close()
+
+	scanner := bufio.NewScanner(gzReader)
+
+	// Create new file to copy complete records from partial file and upload only a complete file
+	recoveredFile, err := os.Create(recoveredFilePath)
+	if err != nil {
+		log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
+		return
+	}
+	defer recoveredFile.Close()
+
+	bufWriter := bufio.NewWriter(recoveredFile)
+	defer bufWriter.Flush()
+
+	gzWriter := gzip.NewWriter(bufWriter)
+	defer gzWriter.Close()
+
+	for scanner.Scan() {
+		gzWriter.Write(scanner.Bytes())
+	}
+
+	if err := scanner.Err(); err != nil {
+		log.Errorf("Error while scanning partial file: %v", err)
+		return
+	}
+}
+
+func deletePartialFile(completeOrigFilePath string) {
+	err := os.Remove(completeOrigFilePath)
+	if err != nil {
+		log.Errorf("Cannot delete partial file :%s", completeOrigFilePath)
+	}
+}
\ No newline at end of file
diff --git a/init.go b/init.go
index 08880fa..b8eaf81 100644
--- a/init.go
+++ b/init.go
@@ -29,9 +29,6 @@
 
 	useCaching = "apidanalytics_use_caching"
 	useCachingDefault = true
-
-	maxRetries = 3
-	retryFailedDirBatchSize = 10
 )
 
 // keep track of the services that this plugin will use
@@ -73,7 +70,7 @@
 func initPlugin(services apid.Services) (apid.PluginData, error) {
 
 	// set a logger that is annotated for this plugin
-	log = services.Log().ForModule("analytics")
+	log = services.Log().ForModule("apigeeAnalytics")
 	log.Debug("start init for apidAnalytics plugin")
 
 	// set configuration
@@ -82,12 +79,14 @@
 		return pluginData, fmt.Errorf("Missing required config value:  %s: ", err)
 	}
 
-	// TOOO: remove later
-	//config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex")
+	// localTesting
+	config.SetDefault(uapServerBase,"http://localhost:9010")
+
 	for _, key := range []string{uapServerBase} {
 		if !config.IsSet(key) {
 			return pluginData, fmt.Errorf("Missing required config value: %s", key)
 		}
+
 	}
 
 	directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
@@ -101,11 +100,11 @@
 	events = services.Events()
 	events.Listen("ApigeeSync", &handler{})
 
-	// TODO: perform crash recovery
+	initCrashRecovery()
+
 	initUploadManager()
 
 	initAPI(services)
-
 	log.Debug("end init for apidAnalytics plugin")
 	return pluginData, nil
 }
@@ -149,7 +148,7 @@
 			if error != nil {
 				return error
 			}
-			log.Infof("created directory %s: ", path)
+			log.Infof("created directory for analytics data collection %s: ", path)
 		}
 	}
 	return nil
diff --git a/listener.go b/listener.go
index 46c67c6..ea9278a 100644
--- a/listener.go
+++ b/listener.go
@@ -36,22 +36,19 @@
 	}
 	setDB(db)
 
-	// After first snapshot is received, create local cache
 	if (config.GetBool(useCaching)) {
 		err = createTenantCache()
 		if err != nil {
-			log.Debugf("Could not create a local cache for datascope info: %s", err.Error())
+			log.Error(err)
 		} else {
 			log.Debug("Created a local cache for datasope information")
 		}
 		err = createDeveloperInfoCache()
 		if err != nil {
-			log.Debugf("Could not create a local cache for developer and app info: %s", err.Error())
-
+			log.Error(err)
 		} else {
 			log.Debug("Created a local cache for developer and app information")
 		}
-		log.Debug("Created a local cache for developer and app information")
 	} else {
 		log.Debug("Will not be caching any info and make a DB call for every analytics msg")
 	}
diff --git a/uploadManager.go b/uploadManager.go
index 93aa3bb..a8ca514 100644
--- a/uploadManager.go
+++ b/uploadManager.go
@@ -8,9 +8,7 @@
 	"time"
 )
 
-var (
-	retriesMap map[string]int
-)
+var retriesMap map[string]int
 
 //TODO:  make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running
 func initUploadManager() {
@@ -19,11 +17,11 @@
 
 	// TODO: add a way to make sure that this go routine is always running
 	go func() {
-		ticker := time.NewTicker(time.Millisecond * config.GetDuration(analyticsUploadInterval) * 1000)
+		ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval))
 		log.Debugf("Intialized upload manager to check for staging directory")
 		defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running
 
-		for t := range ticker.C {
+		for range ticker.C {
 			files, err := ioutil.ReadDir(localAnalyticsStagingDir)
 
 			if err != nil {
@@ -32,7 +30,6 @@
 
 			uploadedDirCnt := 0
 			for _, file := range files {
-				log.Debugf("t: %s , directory: %s", t, file.Name())
 				if file.IsDir() {
 					status := uploadDir(file)
 					handleUploadDirStatus(file, status)
@@ -59,7 +56,7 @@
 		delete(retriesMap, dir.Name())
 	} else {
 		retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
-		if retriesMap[dir.Name()] > maxRetries {
+		if retriesMap[dir.Name()] >= maxRetries {
 			log.Errorf("Max Retires exceeded for folder: %s", completePath)
 			failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
 			err := os.Rename(completePath, failedDirPath)
diff --git a/uploader.go b/uploader.go
index 3941b35..92039e8 100644
--- a/uploader.go
+++ b/uploader.go
@@ -7,53 +7,66 @@
 	"path/filepath"
 	"io/ioutil"
 	"net/http"
-	"errors"
 	"compress/gzip"
+	"fmt"
+	"time"
 )
 
-//var token string
+const (
+	maxRetries = 3
+	retryFailedDirBatchSize = 10
+	timestampLayout = "20060102150405"				// same as yyyyMMddHHmmss
+)
+
+var token string
+
+func addHeaders(req *http.Request) {
+	req.Header.Add("Authorization", "Bearer " + token)
+}
 
 func uploadDir(dir os.FileInfo) bool {
-
 	tenant, timestamp := splitDirName(dir.Name())
 	dateTimePartition := getDateFromDirTimestamp(timestamp)
+	// TODO: Remove
 	log.Debugf("tenant: %s | timestamp %s", tenant, timestamp)
 
 	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
 	files, _ := ioutil.ReadDir(completePath)
 
-	var status bool
+	status := true
 	var error error
 	for _, file := range files {
 		completeFilePath := filepath.Join(completePath, file.Name())
 		relativeFilePath := dateTimePartition + "/" + file.Name();
 		status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
 		if error != nil {
-			log.Errorf("Upload File failed due to : %s", error.Error())
+			log.Errorf("Upload failed due to : %s", error.Error())
 			break
 		} else {
 			os.Remove(completeFilePath)
-			log.Debugf("deleted file after successful upload : %s", file.Name())
+			log.Debugf("Deleted file after successful upload : %s", file.Name())
 		}
 	}
 	return status
 }
 
 func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
-
 	signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath)
 	if (err != nil) {
 		return false, err
 	} else {
 		log.Debugf("signed URL : %s", signedUrl)
 		return true, nil
-		//return uploadToDatastore(completeFilePath, signedUrl)
+		//return uploadFileToDatastore(completeFilePath, signedUrl)
 	}
 }
 
 func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
 	client := &http.Client{}
-	uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+	//uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+	// localTesting
+	uapCollectionUrl := config.GetString(uapServerBase) + "/v1/upload/location"
 
 	req, err := http.NewRequest("GET", uapCollectionUrl, nil)
 	if err != nil {
@@ -61,6 +74,11 @@
 	}
 
 	q := req.URL.Query()
+
+	// localTesting
+	q.Add("repo", "edge")
+	q.Add("dataset", "api")
+
 	q.Add("tenant", tenant)
 	q.Add("relativeFilePath", relativeFilePath)
 	q.Add("contentType", "application/x-gzip")
@@ -73,6 +91,7 @@
 		return "", err
 	}
 	defer resp.Body.Close()
+
 	respBody, _ := ioutil.ReadAll(resp.Body)
 	if(resp.StatusCode == 200) {
 		var body map[string]interface{}
@@ -80,14 +99,11 @@
 		signedURL :=  body["url"]
 		return signedURL.(string), nil
 	} else {
-		return "", errors.New("Getting signed URL failed due to " + resp.Status)
+		return "", fmt.Errorf("Error while getting signed URL: %s",resp.Status)
 	}
 }
-//func addHeaders(req *http.Request) {
-//	req.Header.Add("Authorization", "Bearer " + token)
-//}
 
-func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) {
+func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
 	// read gzip file that needs to be uploaded
 	f, err := os.Open(completeFilePath)
 	if err != nil {
@@ -96,13 +112,13 @@
 	defer f.Close()
 	reader, err := gzip.NewReader(f)
 	if err != nil {
-		return false, err
+		return false, fmt.Errorf("Cannot create reader on gzip file %v", err)
 	}
 
 	client := &http.Client{}
 	req, err := http.NewRequest("PUT", signedUrl, reader)
 	if err != nil {
-		return false, err
+		return false, fmt.Errorf("Parsing URL failed due to %s",err.Error())
 	}
 
 	req.Header.Set("Expect", "100-continue")
@@ -114,10 +130,11 @@
 	}
 	defer resp.Body.Close()
 	if(resp.StatusCode == 200) {
+		// TODO: Remove
 		log.Debugf("response: %v", resp)
 		return true, nil
 	} else {
-		return false, errors.New("Failed to upload file to datastore " + resp.Status)
+		return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %s ", resp.Status)
 	}
 }
 
@@ -128,6 +145,12 @@
 	return  tenant, timestamp
 }
 
+// files are uploaded to S3 under specific partition and that key needs to be generated from the plugin
+// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
 func getDateFromDirTimestamp(timestamp string) (string){
-	return  ""
+	dateTime, _ := time.Parse(timestampLayout, timestamp)
+	date := dateTime.Format("2006-01-02")			// same as YYYY-MM-dd
+	time :=  dateTime.Format("15-04")			// same as HH-mm
+	dateHourTS := "date=" + date  + "/time=" + time
+	return dateHourTS
 }
\ No newline at end of file