[XAPID-377] Added CrashRecovery logic to handle unclosed files incase of plugin crash
diff --git a/api_helper.go b/api_helper.go
index 7ae0b8f..ccda6d5 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -55,6 +55,7 @@
valid, err := validate(recordMap)
if valid {
enrich(recordMap, scopeuuid, tenant)
+ // TODO: Remove log
log.Debugf("Raw records : %v ", eachRecord)
} else {
return err // Even if there is one bad record, then reject entire batch
@@ -99,6 +100,7 @@
if exists {
apiKey := apiKey.(string)
devInfo := getDeveloperInfo(tenant.tenantId, apiKey)
+ // TODO: Remove log
log.Debugf("developerInfo = %v", devInfo)
if recordMap["api_product"] == "" {
recordMap["api_product"] = devInfo.apiProduct
diff --git a/common_helper.go b/common_helper.go
index 0eea2c1..dd2c629 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -27,7 +27,7 @@
tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId}
}
}
- log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache))
+ log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache))
return nil
}
diff --git a/crashRecovery.go b/crashRecovery.go
new file mode 100644
index 0000000..33a0711
--- /dev/null
+++ b/crashRecovery.go
@@ -0,0 +1,165 @@
+package apidAnalytics
+
+import (
+ "time"
+ "io/ioutil"
+ "path/filepath"
+ "bufio"
+ "os"
+ "strings"
+ "compress/gzip"
+)
+
+const crashRecoveryDelay = 30 // seconds
+const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format
+const fileExtension = ".txt.gz";
+
+const recoveredTS = "~recoveredTS~"
+
+
+func initCrashRecovery() {
+ if crashRecoveryNeeded() {
+ timer := time.NewTimer(time.Second * crashRecoveryDelay)
+ go func() {
+ <- timer.C
+ performRecovery()
+ }()
+ }
+}
+
+func crashRecoveryNeeded() (bool) {
+ recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
+ tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
+ needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
+ if needed {
+ log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay)
+ }
+ return needed
+}
+
+func recoverFoldersInTmpDir() bool {
+ tmpRecoveryNeeded := false
+ dirs,_ := ioutil.ReadDir(localAnalyticsTempDir)
+ recoveryTS := getRecoveryTS()
+ for _, dir := range dirs {
+ tmpRecoveryNeeded = true
+ log.Debugf("Moving directory %s from tmp to recovered ", dir.Name())
+ tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
+
+ newDirName := dir.Name() + recoveredTS + recoveryTS;
+ recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName)
+ err := os.Rename(tmpCompletePath, recoveredCompletePath)
+ if err != nil {
+ log.Errorf("Cannot move directory :%s to recovered folder", dir.Name())
+ }
+ }
+ return tmpRecoveryNeeded
+}
+
+func getRecoveryTS() string {
+ current := time.Now()
+ return current.Format(recovertTSLayout)
+}
+
+func recoverFolderInRecoveredDir() bool {
+ dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+ if len(dirs) > 0 {
+ return true
+ }
+ return false
+}
+
+func performRecovery() {
+ log.Info("Crash recovery is starting...");
+ recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+ for _, dir := range recoveryDirs {
+ recoverDirectory(dir.Name());
+ }
+ log.Info("Crash recovery complete...");
+}
+
+func recoverDirectory(dirName string) {
+ log.Infof("performing crash recovery for directory: %s", dirName);
+ var bucketRecoveryTS string
+
+ // Parse bucket name to extract recoveryTS and pass it each file to be recovered
+ index := strings.Index(dirName, recoveredTS)
+ if index != -1 {
+ bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
+ }
+
+ dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
+ files, _ := ioutil.ReadDir(dirBeingRecovered)
+ for _, file := range files {
+ // recovering each file sequentially for now
+ recoverFile(bucketRecoveryTS, dirName, file.Name());
+ }
+
+ stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+ err := os.Rename(dirBeingRecovered, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory :%s to staging folder", dirName)
+ }
+}
+
+func recoverFile(bucketRecoveryTS, dirName, fileName string) {
+ log.Debugf("performing crash recovery for file: %s ", fileName)
+ // add recovery timestamp to the file name
+ completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
+ recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
+ recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
+ recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
+ copyPartialFile(completeOrigFilePath, recoveredFilePath);
+ deletePartialFile(completeOrigFilePath);
+}
+
+func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
+
+ // read partial file line by line using buffered gzip reader
+ partialFile, err := os.Open(completeOrigFilePath)
+ if err != nil {
+ log.Errorf("Cannot open file: %s", completeOrigFilePath)
+ return
+ }
+ defer partialFile.Close()
+
+ bufReader := bufio.NewReader(partialFile)
+ gzReader, err := gzip.NewReader(bufReader)
+ if err != nil {
+ log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err)
+ return
+ }
+ defer gzReader.Close()
+
+ scanner := bufio.NewScanner(gzReader)
+
+ // Create new file to copy complete records from partial file and upload only a complete file
+ recoveredFile, err := os.Create(recoveredFilePath)
+ if err != nil {
+ log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
+ return
+ }
+ defer recoveredFile.Close()
+
+ bufWriter := bufio.NewWriter(recoveredFile)
+ defer bufWriter.Flush()
+
+ gzWriter := gzip.NewWriter(bufWriter)
+ defer gzWriter.Close()
+
+ for scanner.Scan() {
+ gzWriter.Write(scanner.Bytes())
+ }
+
+ if err := scanner.Err(); err != nil {
+ log.Errorf("Error while scanning partial file: %v", err)
+ return
+ }
+}
+
+func deletePartialFile(completeOrigFilePath string) {
+ err := os.Remove(completeOrigFilePath)
+ if err != nil {
+ log.Errorf("Cannot delete partial file :%s", completeOrigFilePath)
+ }
+}
\ No newline at end of file
diff --git a/init.go b/init.go
index 08880fa..b8eaf81 100644
--- a/init.go
+++ b/init.go
@@ -29,9 +29,6 @@
useCaching = "apidanalytics_use_caching"
useCachingDefault = true
-
- maxRetries = 3
- retryFailedDirBatchSize = 10
)
// keep track of the services that this plugin will use
@@ -73,7 +70,7 @@
func initPlugin(services apid.Services) (apid.PluginData, error) {
// set a logger that is annotated for this plugin
- log = services.Log().ForModule("analytics")
+ log = services.Log().ForModule("apigeeAnalytics")
log.Debug("start init for apidAnalytics plugin")
// set configuration
@@ -82,12 +79,14 @@
return pluginData, fmt.Errorf("Missing required config value: %s: ", err)
}
- // TOOO: remove later
- //config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex")
+ // localTesting
+ config.SetDefault(uapServerBase,"http://localhost:9010")
+
for _, key := range []string{uapServerBase} {
if !config.IsSet(key) {
return pluginData, fmt.Errorf("Missing required config value: %s", key)
}
+
}
directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
@@ -101,11 +100,11 @@
events = services.Events()
events.Listen("ApigeeSync", &handler{})
- // TODO: perform crash recovery
+ initCrashRecovery()
+
initUploadManager()
initAPI(services)
-
log.Debug("end init for apidAnalytics plugin")
return pluginData, nil
}
@@ -149,7 +148,7 @@
if error != nil {
return error
}
- log.Infof("created directory %s: ", path)
+ log.Infof("created directory for analytics data collection %s: ", path)
}
}
return nil
diff --git a/listener.go b/listener.go
index 46c67c6..ea9278a 100644
--- a/listener.go
+++ b/listener.go
@@ -36,22 +36,19 @@
}
setDB(db)
- // After first snapshot is received, create local cache
if (config.GetBool(useCaching)) {
err = createTenantCache()
if err != nil {
- log.Debugf("Could not create a local cache for datascope info: %s", err.Error())
+ log.Error(err)
} else {
log.Debug("Created a local cache for datasope information")
}
err = createDeveloperInfoCache()
if err != nil {
- log.Debugf("Could not create a local cache for developer and app info: %s", err.Error())
-
+ log.Error(err)
} else {
log.Debug("Created a local cache for developer and app information")
}
- log.Debug("Created a local cache for developer and app information")
} else {
log.Debug("Will not be caching any info and make a DB call for every analytics msg")
}
diff --git a/uploadManager.go b/uploadManager.go
index 93aa3bb..a8ca514 100644
--- a/uploadManager.go
+++ b/uploadManager.go
@@ -8,9 +8,7 @@
"time"
)
-var (
- retriesMap map[string]int
-)
+var retriesMap map[string]int
//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running
func initUploadManager() {
@@ -19,11 +17,11 @@
// TODO: add a way to make sure that this go routine is always running
go func() {
- ticker := time.NewTicker(time.Millisecond * config.GetDuration(analyticsUploadInterval) * 1000)
+ ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval))
log.Debugf("Intialized upload manager to check for staging directory")
defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running
- for t := range ticker.C {
+ for range ticker.C {
files, err := ioutil.ReadDir(localAnalyticsStagingDir)
if err != nil {
@@ -32,7 +30,6 @@
uploadedDirCnt := 0
for _, file := range files {
- log.Debugf("t: %s , directory: %s", t, file.Name())
if file.IsDir() {
status := uploadDir(file)
handleUploadDirStatus(file, status)
@@ -59,7 +56,7 @@
delete(retriesMap, dir.Name())
} else {
retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
- if retriesMap[dir.Name()] > maxRetries {
+ if retriesMap[dir.Name()] >= maxRetries {
log.Errorf("Max Retires exceeded for folder: %s", completePath)
failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
err := os.Rename(completePath, failedDirPath)
diff --git a/uploader.go b/uploader.go
index 3941b35..92039e8 100644
--- a/uploader.go
+++ b/uploader.go
@@ -7,53 +7,66 @@
"path/filepath"
"io/ioutil"
"net/http"
- "errors"
"compress/gzip"
+ "fmt"
+ "time"
)
-//var token string
+const (
+ maxRetries = 3
+ retryFailedDirBatchSize = 10
+ timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
+)
+
+var token string
+
+func addHeaders(req *http.Request) {
+ req.Header.Add("Authorization", "Bearer " + token)
+}
func uploadDir(dir os.FileInfo) bool {
-
tenant, timestamp := splitDirName(dir.Name())
dateTimePartition := getDateFromDirTimestamp(timestamp)
+ // TODO: Remove
log.Debugf("tenant: %s | timestamp %s", tenant, timestamp)
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
files, _ := ioutil.ReadDir(completePath)
- var status bool
+ status := true
var error error
for _, file := range files {
completeFilePath := filepath.Join(completePath, file.Name())
relativeFilePath := dateTimePartition + "/" + file.Name();
status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
if error != nil {
- log.Errorf("Upload File failed due to : %s", error.Error())
+ log.Errorf("Upload failed due to : %s", error.Error())
break
} else {
os.Remove(completeFilePath)
- log.Debugf("deleted file after successful upload : %s", file.Name())
+ log.Debugf("Deleted file after successful upload : %s", file.Name())
}
}
return status
}
func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
-
signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath)
if (err != nil) {
return false, err
} else {
log.Debugf("signed URL : %s", signedUrl)
return true, nil
- //return uploadToDatastore(completeFilePath, signedUrl)
+ //return uploadFileToDatastore(completeFilePath, signedUrl)
}
}
func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
client := &http.Client{}
- uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+ //uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+ // localTesting
+ uapCollectionUrl := config.GetString(uapServerBase) + "/v1/upload/location"
req, err := http.NewRequest("GET", uapCollectionUrl, nil)
if err != nil {
@@ -61,6 +74,11 @@
}
q := req.URL.Query()
+
+ // localTesting
+ q.Add("repo", "edge")
+ q.Add("dataset", "api")
+
q.Add("tenant", tenant)
q.Add("relativeFilePath", relativeFilePath)
q.Add("contentType", "application/x-gzip")
@@ -73,6 +91,7 @@
return "", err
}
defer resp.Body.Close()
+
respBody, _ := ioutil.ReadAll(resp.Body)
if(resp.StatusCode == 200) {
var body map[string]interface{}
@@ -80,14 +99,11 @@
signedURL := body["url"]
return signedURL.(string), nil
} else {
- return "", errors.New("Getting signed URL failed due to " + resp.Status)
+ return "", fmt.Errorf("Error while getting signed URL: %s",resp.Status)
}
}
-//func addHeaders(req *http.Request) {
-// req.Header.Add("Authorization", "Bearer " + token)
-//}
-func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) {
+func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
// read gzip file that needs to be uploaded
f, err := os.Open(completeFilePath)
if err != nil {
@@ -96,13 +112,13 @@
defer f.Close()
reader, err := gzip.NewReader(f)
if err != nil {
- return false, err
+ return false, fmt.Errorf("Cannot create reader on gzip file %v", err)
}
client := &http.Client{}
req, err := http.NewRequest("PUT", signedUrl, reader)
if err != nil {
- return false, err
+ return false, fmt.Errorf("Parsing URL failed due to %s",err.Error())
}
req.Header.Set("Expect", "100-continue")
@@ -114,10 +130,11 @@
}
defer resp.Body.Close()
if(resp.StatusCode == 200) {
+ // TODO: Remove
log.Debugf("response: %v", resp)
return true, nil
} else {
- return false, errors.New("Failed to upload file to datastore " + resp.Status)
+ return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %s ", resp.Status)
}
}
@@ -128,6 +145,12 @@
return tenant, timestamp
}
+// files are uploaded to S3 under specific partition and that key needs to be generated from the plugin
+// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
func getDateFromDirTimestamp(timestamp string) (string){
- return ""
+ dateTime, _ := time.Parse(timestampLayout, timestamp)
+ date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd
+ time := dateTime.Format("15-04") // same as HH-mm
+ dateHourTS := "date=" + date + "/time=" + time
+ return dateHourTS
}
\ No newline at end of file