[XAPID-377] Added CrashRecovery logic to handle unclosed files incase of plugin crash
diff --git a/api_helper.go b/api_helper.go index 7ae0b8f..ccda6d5 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -55,6 +55,7 @@ valid, err := validate(recordMap) if valid { enrich(recordMap, scopeuuid, tenant) + // TODO: Remove log log.Debugf("Raw records : %v ", eachRecord) } else { return err // Even if there is one bad record, then reject entire batch @@ -99,6 +100,7 @@ if exists { apiKey := apiKey.(string) devInfo := getDeveloperInfo(tenant.tenantId, apiKey) + // TODO: Remove log log.Debugf("developerInfo = %v", devInfo) if recordMap["api_product"] == "" { recordMap["api_product"] = devInfo.apiProduct
diff --git a/common_helper.go b/common_helper.go index 0eea2c1..dd2c629 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -27,7 +27,7 @@ tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId} } } - log.Debugf("Couch of datadscopes in the cache: %d", len(tenantCache)) + log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache)) return nil }
diff --git a/crashRecovery.go b/crashRecovery.go new file mode 100644 index 0000000..33a0711 --- /dev/null +++ b/crashRecovery.go
@@ -0,0 +1,165 @@ +package apidAnalytics + +import ( + "time" + "io/ioutil" + "path/filepath" + "bufio" + "os" + "strings" + "compress/gzip" +) + +const crashRecoveryDelay = 30 // seconds +const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format +const fileExtension = ".txt.gz"; + +const recoveredTS = "~recoveredTS~" + + +func initCrashRecovery() { + if crashRecoveryNeeded() { + timer := time.NewTimer(time.Second * crashRecoveryDelay) + go func() { + <- timer.C + performRecovery() + }() + } +} + +func crashRecoveryNeeded() (bool) { + recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir() + tmpDirRecoveryNeeded := recoverFoldersInTmpDir() + needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded + if needed { + log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay) + } + return needed +} + +func recoverFoldersInTmpDir() bool { + tmpRecoveryNeeded := false + dirs,_ := ioutil.ReadDir(localAnalyticsTempDir) + recoveryTS := getRecoveryTS() + for _, dir := range dirs { + tmpRecoveryNeeded = true + log.Debugf("Moving directory %s from tmp to recovered ", dir.Name()) + tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name()) + + newDirName := dir.Name() + recoveredTS + recoveryTS; + recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName) + err := os.Rename(tmpCompletePath, recoveredCompletePath) + if err != nil { + log.Errorf("Cannot move directory :%s to recovered folder", dir.Name()) + } + } + return tmpRecoveryNeeded +} + +func getRecoveryTS() string { + current := time.Now() + return current.Format(recovertTSLayout) +} + +func recoverFolderInRecoveredDir() bool { + dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) + if len(dirs) > 0 { + return true + } + return false +} + +func performRecovery() { + log.Info("Crash recovery is starting..."); + recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) + for _, dir := range recoveryDirs { + recoverDirectory(dir.Name()); + } + log.Info("Crash recovery complete..."); +} + +func recoverDirectory(dirName string) { + log.Infof("performing crash recovery for directory: %s", dirName); + var bucketRecoveryTS string + + // Parse bucket name to extract recoveryTS and pass it each file to be recovered + index := strings.Index(dirName, recoveredTS) + if index != -1 { + bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):] + } + + dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName) + files, _ := ioutil.ReadDir(dirBeingRecovered) + for _, file := range files { + // recovering each file sequentially for now + recoverFile(bucketRecoveryTS, dirName, file.Name()); + } + + stagingPath := filepath.Join(localAnalyticsStagingDir, dirName) + err := os.Rename(dirBeingRecovered, stagingPath) + if err != nil { + log.Errorf("Cannot move directory :%s to staging folder", dirName) + } +} + +func recoverFile(bucketRecoveryTS, dirName, fileName string) { + log.Debugf("performing crash recovery for file: %s ", fileName) + // add recovery timestamp to the file name + completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName) + recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension + recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension + recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName) + copyPartialFile(completeOrigFilePath, recoveredFilePath); + deletePartialFile(completeOrigFilePath); +} + +func copyPartialFile(completeOrigFilePath, recoveredFilePath string) { + + // read partial file line by line using buffered gzip reader + partialFile, err := os.Open(completeOrigFilePath) + if err != nil { + log.Errorf("Cannot open file: %s", completeOrigFilePath) + return + } + defer partialFile.Close() + + bufReader := bufio.NewReader(partialFile) + gzReader, err := gzip.NewReader(bufReader) + if err != nil { + log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err) + return + } + defer gzReader.Close() + + scanner := bufio.NewScanner(gzReader) + + // Create new file to copy complete records from partial file and upload only a complete file + recoveredFile, err := os.Create(recoveredFilePath) + if err != nil { + log.Errorf("Cannot create recovered file: %s", recoveredFilePath) + return + } + defer recoveredFile.Close() + + bufWriter := bufio.NewWriter(recoveredFile) + defer bufWriter.Flush() + + gzWriter := gzip.NewWriter(bufWriter) + defer gzWriter.Close() + + for scanner.Scan() { + gzWriter.Write(scanner.Bytes()) + } + + if err := scanner.Err(); err != nil { + log.Errorf("Error while scanning partial file: %v", err) + return + } +} + +func deletePartialFile(completeOrigFilePath string) { + err := os.Remove(completeOrigFilePath) + if err != nil { + log.Errorf("Cannot delete partial file :%s", completeOrigFilePath) + } +} \ No newline at end of file
diff --git a/init.go b/init.go index 08880fa..b8eaf81 100644 --- a/init.go +++ b/init.go
@@ -29,9 +29,6 @@ useCaching = "apidanalytics_use_caching" useCachingDefault = true - - maxRetries = 3 - retryFailedDirBatchSize = 10 ) // keep track of the services that this plugin will use @@ -73,7 +70,7 @@ func initPlugin(services apid.Services) (apid.PluginData, error) { // set a logger that is annotated for this plugin - log = services.Log().ForModule("analytics") + log = services.Log().ForModule("apigeeAnalytics") log.Debug("start init for apidAnalytics plugin") // set configuration @@ -82,12 +79,14 @@ return pluginData, fmt.Errorf("Missing required config value: %s: ", err) } - // TOOO: remove later - //config.SetDefault(uapServerBase,"https://edgex-internal-test.e2e.apigee.net/edgex") + // localTesting + config.SetDefault(uapServerBase,"http://localhost:9010") + for _, key := range []string{uapServerBase} { if !config.IsSet(key) { return pluginData, fmt.Errorf("Missing required config value: %s", key) } + } directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} @@ -101,11 +100,11 @@ events = services.Events() events.Listen("ApigeeSync", &handler{}) - // TODO: perform crash recovery + initCrashRecovery() + initUploadManager() initAPI(services) - log.Debug("end init for apidAnalytics plugin") return pluginData, nil } @@ -149,7 +148,7 @@ if error != nil { return error } - log.Infof("created directory %s: ", path) + log.Infof("created directory for analytics data collection %s: ", path) } } return nil
diff --git a/listener.go b/listener.go index 46c67c6..ea9278a 100644 --- a/listener.go +++ b/listener.go
@@ -36,22 +36,19 @@ } setDB(db) - // After first snapshot is received, create local cache if (config.GetBool(useCaching)) { err = createTenantCache() if err != nil { - log.Debugf("Could not create a local cache for datascope info: %s", err.Error()) + log.Error(err) } else { log.Debug("Created a local cache for datasope information") } err = createDeveloperInfoCache() if err != nil { - log.Debugf("Could not create a local cache for developer and app info: %s", err.Error()) - + log.Error(err) } else { log.Debug("Created a local cache for developer and app information") } - log.Debug("Created a local cache for developer and app information") } else { log.Debug("Will not be caching any info and make a DB call for every analytics msg") }
diff --git a/uploadManager.go b/uploadManager.go index 93aa3bb..a8ca514 100644 --- a/uploadManager.go +++ b/uploadManager.go
@@ -8,9 +8,7 @@ "time" ) -var ( - retriesMap map[string]int -) +var retriesMap map[string]int //TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running func initUploadManager() { @@ -19,11 +17,11 @@ // TODO: add a way to make sure that this go routine is always running go func() { - ticker := time.NewTicker(time.Millisecond * config.GetDuration(analyticsUploadInterval) * 1000) + ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval)) log.Debugf("Intialized upload manager to check for staging directory") defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running - for t := range ticker.C { + for range ticker.C { files, err := ioutil.ReadDir(localAnalyticsStagingDir) if err != nil { @@ -32,7 +30,6 @@ uploadedDirCnt := 0 for _, file := range files { - log.Debugf("t: %s , directory: %s", t, file.Name()) if file.IsDir() { status := uploadDir(file) handleUploadDirStatus(file, status) @@ -59,7 +56,7 @@ delete(retriesMap, dir.Name()) } else { retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1 - if retriesMap[dir.Name()] > maxRetries { + if retriesMap[dir.Name()] >= maxRetries { log.Errorf("Max Retires exceeded for folder: %s", completePath) failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name()) err := os.Rename(completePath, failedDirPath)
diff --git a/uploader.go b/uploader.go index 3941b35..92039e8 100644 --- a/uploader.go +++ b/uploader.go
@@ -7,53 +7,66 @@ "path/filepath" "io/ioutil" "net/http" - "errors" "compress/gzip" + "fmt" + "time" ) -//var token string +const ( + maxRetries = 3 + retryFailedDirBatchSize = 10 + timestampLayout = "20060102150405" // same as yyyyMMddHHmmss +) + +var token string + +func addHeaders(req *http.Request) { + req.Header.Add("Authorization", "Bearer " + token) +} func uploadDir(dir os.FileInfo) bool { - tenant, timestamp := splitDirName(dir.Name()) dateTimePartition := getDateFromDirTimestamp(timestamp) + // TODO: Remove log.Debugf("tenant: %s | timestamp %s", tenant, timestamp) completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) files, _ := ioutil.ReadDir(completePath) - var status bool + status := true var error error for _, file := range files { completeFilePath := filepath.Join(completePath, file.Name()) relativeFilePath := dateTimePartition + "/" + file.Name(); status, error = uploadFile(tenant,relativeFilePath, completeFilePath) if error != nil { - log.Errorf("Upload File failed due to : %s", error.Error()) + log.Errorf("Upload failed due to : %s", error.Error()) break } else { os.Remove(completeFilePath) - log.Debugf("deleted file after successful upload : %s", file.Name()) + log.Debugf("Deleted file after successful upload : %s", file.Name()) } } return status } func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) { - signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath) if (err != nil) { return false, err } else { log.Debugf("signed URL : %s", signedUrl) return true, nil - //return uploadToDatastore(completeFilePath, signedUrl) + //return uploadFileToDatastore(completeFilePath, signedUrl) } } func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) { client := &http.Client{} - uapCollectionUrl := config.GetString(uapServerBase) + "/analytics" + //uapCollectionUrl := config.GetString(uapServerBase) + "/analytics" + + // localTesting + uapCollectionUrl := config.GetString(uapServerBase) + "/v1/upload/location" req, err := http.NewRequest("GET", uapCollectionUrl, nil) if err != nil { @@ -61,6 +74,11 @@ } q := req.URL.Query() + + // localTesting + q.Add("repo", "edge") + q.Add("dataset", "api") + q.Add("tenant", tenant) q.Add("relativeFilePath", relativeFilePath) q.Add("contentType", "application/x-gzip") @@ -73,6 +91,7 @@ return "", err } defer resp.Body.Close() + respBody, _ := ioutil.ReadAll(resp.Body) if(resp.StatusCode == 200) { var body map[string]interface{} @@ -80,14 +99,11 @@ signedURL := body["url"] return signedURL.(string), nil } else { - return "", errors.New("Getting signed URL failed due to " + resp.Status) + return "", fmt.Errorf("Error while getting signed URL: %s",resp.Status) } } -//func addHeaders(req *http.Request) { -// req.Header.Add("Authorization", "Bearer " + token) -//} -func uploadToDatastore(completeFilePath, signedUrl string) (bool, error) { +func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) { // read gzip file that needs to be uploaded f, err := os.Open(completeFilePath) if err != nil { @@ -96,13 +112,13 @@ defer f.Close() reader, err := gzip.NewReader(f) if err != nil { - return false, err + return false, fmt.Errorf("Cannot create reader on gzip file %v", err) } client := &http.Client{} req, err := http.NewRequest("PUT", signedUrl, reader) if err != nil { - return false, err + return false, fmt.Errorf("Parsing URL failed due to %s",err.Error()) } req.Header.Set("Expect", "100-continue") @@ -114,10 +130,11 @@ } defer resp.Body.Close() if(resp.StatusCode == 200) { + // TODO: Remove log.Debugf("response: %v", resp) return true, nil } else { - return false, errors.New("Failed to upload file to datastore " + resp.Status) + return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %s ", resp.Status) } } @@ -128,6 +145,12 @@ return tenant, timestamp } +// files are uploaded to S3 under specific partition and that key needs to be generated from the plugin +// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz func getDateFromDirTimestamp(timestamp string) (string){ - return "" + dateTime, _ := time.Parse(timestampLayout, timestamp) + date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd + time := dateTime.Format("15-04") // same as HH-mm + dateHourTS := "date=" + date + "/time=" + time + return dateHourTS } \ No newline at end of file