blob: db5116f5b401a1ff504a9f8540a10276cfa54c70 [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidAnalytics
import (
"bufio"
"compress/gzip"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
const (
crashRecoveryDelay = 30 // seconds
// same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
recoveryTSLayout = "20060102150405.000"
// Constant to identify recovered files
recoveredTS = "~recoveredTS~"
)
func initCrashRecovery() {
if crashRecoveryNeeded() {
timer := time.After(time.Second * crashRecoveryDelay)
// Actual recovery of files is attempted asynchronously
// after a delay to not block the apid plugin from starting up
go func() {
<-timer
performRecovery()
}()
}
}
// Crash recovery is needed if there are any folders in tmp or recovered directory
func crashRecoveryNeeded() bool {
recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
if needed {
log.Infof("Crash Recovery is needed and will be "+
"attempted in %d seconds", crashRecoveryDelay)
}
return needed
}
// If Apid is shutdown or crashes while a file is still open in tmp folder,
// then the file has partial data.
// This partial data can be recoverd.
func recoverFoldersInTmpDir() bool {
tmpRecoveryNeeded := false
dirs, _ := ioutil.ReadDir(localAnalyticsTempDir)
recoveryTS := getRecoveryTS()
for _, dir := range dirs {
tmpRecoveryNeeded = true
log.Debugf("Moving directory '%s' from tmp to"+
" recovered folder", dir.Name())
tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
// Eg. org~env~20160101222400~recoveredTS~20160101222612.123
newDirName := dir.Name() + recoveredTS + recoveryTS
recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName)
err := os.Rename(tmpCompletePath, recoveredCompletePath)
if err != nil {
log.Errorf("Cannot move directory '%s' "+
"from tmp to recovered folder", dir.Name())
}
}
return tmpRecoveryNeeded
}
// Get Timestamp for when the recovery is being attempted on the folder.
func getRecoveryTS() string {
current := time.Now()
return current.Format(recoveryTSLayout)
}
// If APID is restarted twice immediately such that files have been moved to
// recovered folder but actual recovery has'nt started or is partially done
// Then the files will just stay in the recovered dir
// and need to be recovered again.
func recoverFolderInRecoveredDir() bool {
dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
if len(dirs) > 0 {
return true
}
return false
}
func performRecovery() {
log.Info("Crash recovery is starting...")
recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
for _, dir := range recoveryDirs {
recoverDirectory(dir.Name())
}
log.Info("Crash recovery complete...")
}
func recoverDirectory(dirName string) {
log.Infof("performing crash recovery for directory: %s", dirName)
var bucketRecoveryTS string
// Parse bucket name to extract recoveryTS and pass it each file to be recovered
// Eg. org~env~20160101222400~recoveredTS~20160101222612.123
// -> bucketRecoveryTS = _20160101222612.123
index := strings.Index(dirName, recoveredTS)
if index != -1 {
bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
}
dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
files, _ := ioutil.ReadDir(dirBeingRecovered)
for _, file := range files {
// recovering each file sequentially for now
recoverFile(bucketRecoveryTS, dirName, file.Name())
}
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
err := os.Rename(dirBeingRecovered, stagingPath)
if err != nil {
log.Errorf("Cannot move directory '%s' from"+
" recovered to staging folder", dirName)
}
}
func recoverFile(bucketRecoveryTS, dirName, fileName string) {
log.Debugf("performing crash recovery for file: %s ", fileName)
// add recovery timestamp to the file name
completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
// eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt
recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
// Copy complete records to new file and delete original partial file
copyPartialFile(completeOrigFilePath, recoveredFilePath)
deletePartialFile(completeOrigFilePath)
}
// The file is read line by line and all complete records are extracted
// and copied to a new file which is closed as a correct gzip file.
func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
partialFile, err := os.Open(completeOrigFilePath)
if err != nil {
log.Errorf("Cannot open file: %s", completeOrigFilePath)
return
}
defer partialFile.Close()
bufReader := bufio.NewReader(partialFile)
gzReader, err := gzip.NewReader(bufReader)
if err != nil {
log.Errorf("Cannot create reader on gzip file: %s"+
" due to %v", completeOrigFilePath, err)
return
}
defer gzReader.Close()
scanner := bufio.NewScanner(gzReader)
// Create new file to copy complete records from partial file and upload only a complete file
recoveredFile, err := os.OpenFile(recoveredFilePath,
os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
return
}
defer recoveredFile.Close()
gzWriter := gzip.NewWriter(recoveredFile)
defer gzWriter.Close()
bufWriter := bufio.NewWriter(gzWriter)
defer bufWriter.Flush()
for scanner.Scan() {
bufWriter.Write(scanner.Bytes())
bufWriter.WriteString("\n")
}
if err := scanner.Err(); err != nil {
log.Warnf("Error while scanning partial file: %v", err)
return
}
}
func deletePartialFile(completeOrigFilePath string) {
err := os.Remove(completeOrigFilePath)
if err != nil {
log.Errorf("Cannot delete partial file: %s", completeOrigFilePath)
}
}