blob: 971270d1b0f0477d1e34340d9cd9a3bd6d61614a [file] [log] [blame]
package apidAnalytics
import (
"time"
"io/ioutil"
"path/filepath"
"bufio"
"os"
"strings"
"compress/gzip"
)
const crashRecoveryDelay = 30 // seconds
const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format
const fileExtension = ".txt.gz";
const recoveredTS = "~recoveredTS~"
func initCrashRecovery() {
if crashRecoveryNeeded() {
timer := time.After(time.Second * crashRecoveryDelay)
go func() {
<- timer
performRecovery()
}()
}
}
func crashRecoveryNeeded() (bool) {
recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
if needed {
log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay)
}
return needed
}
func recoverFoldersInTmpDir() bool {
tmpRecoveryNeeded := false
dirs,_ := ioutil.ReadDir(localAnalyticsTempDir)
recoveryTS := getRecoveryTS()
for _, dir := range dirs {
tmpRecoveryNeeded = true
log.Debugf("Moving directory %s from tmp to recovered ", dir.Name())
tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
newDirName := dir.Name() + recoveredTS + recoveryTS;
recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName)
err := os.Rename(tmpCompletePath, recoveredCompletePath)
if err != nil {
log.Errorf("Cannot move directory :%s to recovered folder", dir.Name())
}
}
return tmpRecoveryNeeded
}
func getRecoveryTS() string {
current := time.Now()
return current.Format(recovertTSLayout)
}
func recoverFolderInRecoveredDir() bool {
dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
if len(dirs) > 0 {
return true
}
return false
}
func performRecovery() {
log.Info("Crash recovery is starting...");
recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
for _, dir := range recoveryDirs {
recoverDirectory(dir.Name());
}
log.Info("Crash recovery complete...");
}
func recoverDirectory(dirName string) {
log.Infof("performing crash recovery for directory: %s", dirName);
var bucketRecoveryTS string
// Parse bucket name to extract recoveryTS and pass it each file to be recovered
index := strings.Index(dirName, recoveredTS)
if index != -1 {
bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
}
dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
files, _ := ioutil.ReadDir(dirBeingRecovered)
for _, file := range files {
// recovering each file sequentially for now
recoverFile(bucketRecoveryTS, dirName, file.Name());
}
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
err := os.Rename(dirBeingRecovered, stagingPath)
if err != nil {
log.Errorf("Cannot move directory :%s to staging folder", dirName)
}
}
func recoverFile(bucketRecoveryTS, dirName, fileName string) {
log.Debugf("performing crash recovery for file: %s ", fileName)
// add recovery timestamp to the file name
completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
copyPartialFile(completeOrigFilePath, recoveredFilePath);
deletePartialFile(completeOrigFilePath);
}
func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
// read partial file line by line using buffered gzip reader
partialFile, err := os.Open(completeOrigFilePath)
if err != nil {
log.Errorf("Cannot open file: %s", completeOrigFilePath)
return
}
defer partialFile.Close()
bufReader := bufio.NewReader(partialFile)
gzReader, err := gzip.NewReader(bufReader)
if err != nil {
log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err)
return
}
defer gzReader.Close()
scanner := bufio.NewScanner(gzReader)
// Create new file to copy complete records from partial file and upload only a complete file
recoveredFile, err := os.Create(recoveredFilePath)
if err != nil {
log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
return
}
defer recoveredFile.Close()
bufWriter := bufio.NewWriter(recoveredFile)
defer bufWriter.Flush()
gzWriter := gzip.NewWriter(bufWriter)
defer gzWriter.Close()
for scanner.Scan() {
gzWriter.Write(scanner.Bytes())
}
if err := scanner.Err(); err != nil {
log.Errorf("Error while scanning partial file: %v", err)
return
}
}
func deletePartialFile(completeOrigFilePath string) {
err := os.Remove(completeOrigFilePath)
if err != nil {
log.Errorf("Cannot delete partial file :%s", completeOrigFilePath)
}
}