| // Copyright 2017 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package apidAnalytics |
| |
| import ( |
| "bufio" |
| "compress/gzip" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "strings" |
| "time" |
| ) |
| |
| const ( |
| crashRecoveryDelay = 30 // seconds |
| // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) |
| recoveryTSLayout = "20060102150405.000" |
| // Constant to identify recovered files |
| recoveredTS = "~recoveredTS~" |
| ) |
| |
| func initCrashRecovery() { |
| if crashRecoveryNeeded() { |
| timer := time.After(time.Second * crashRecoveryDelay) |
| // Actual recovery of files is attempted asynchronously |
| // after a delay to not block the apid plugin from starting up |
| go func() { |
| <-timer |
| performRecovery() |
| }() |
| } |
| } |
| |
| // Crash recovery is needed if there are any folders in tmp or recovered directory |
| func crashRecoveryNeeded() bool { |
| recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir() |
| tmpDirRecoveryNeeded := recoverFoldersInTmpDir() |
| needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded |
| if needed { |
| log.Infof("Crash Recovery is needed and will be "+ |
| "attempted in %d seconds", crashRecoveryDelay) |
| } |
| return needed |
| } |
| |
| // If Apid is shutdown or crashes while a file is still open in tmp folder, |
| // then the file has partial data. |
| // This partial data can be recoverd. |
| func recoverFoldersInTmpDir() bool { |
| tmpRecoveryNeeded := false |
| dirs, _ := ioutil.ReadDir(localAnalyticsTempDir) |
| recoveryTS := getRecoveryTS() |
| for _, dir := range dirs { |
| tmpRecoveryNeeded = true |
| log.Debugf("Moving directory '%s' from tmp to"+ |
| " recovered folder", dir.Name()) |
| tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name()) |
| // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 |
| newDirName := dir.Name() + recoveredTS + recoveryTS |
| recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName) |
| err := os.Rename(tmpCompletePath, recoveredCompletePath) |
| if err != nil { |
| log.Errorf("Cannot move directory '%s' "+ |
| "from tmp to recovered folder", dir.Name()) |
| } |
| } |
| return tmpRecoveryNeeded |
| } |
| |
| // Get Timestamp for when the recovery is being attempted on the folder. |
| func getRecoveryTS() string { |
| current := time.Now() |
| return current.Format(recoveryTSLayout) |
| } |
| |
| // If APID is restarted twice immediately such that files have been moved to |
| // recovered folder but actual recovery has'nt started or is partially done |
| // Then the files will just stay in the recovered dir |
| // and need to be recovered again. |
| func recoverFolderInRecoveredDir() bool { |
| dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) |
| if len(dirs) > 0 { |
| return true |
| } |
| return false |
| } |
| |
| func performRecovery() { |
| log.Info("Crash recovery is starting...") |
| recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) |
| for _, dir := range recoveryDirs { |
| recoverDirectory(dir.Name()) |
| } |
| log.Info("Crash recovery complete...") |
| } |
| |
| func recoverDirectory(dirName string) { |
| log.Infof("performing crash recovery for directory: %s", dirName) |
| var bucketRecoveryTS string |
| |
| // Parse bucket name to extract recoveryTS and pass it each file to be recovered |
| // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 |
| // -> bucketRecoveryTS = _20160101222612.123 |
| index := strings.Index(dirName, recoveredTS) |
| if index != -1 { |
| bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):] |
| } |
| |
| dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName) |
| files, _ := ioutil.ReadDir(dirBeingRecovered) |
| for _, file := range files { |
| // recovering each file sequentially for now |
| recoverFile(bucketRecoveryTS, dirName, file.Name()) |
| } |
| |
| stagingPath := filepath.Join(localAnalyticsStagingDir, dirName) |
| err := os.Rename(dirBeingRecovered, stagingPath) |
| if err != nil { |
| log.Errorf("Cannot move directory '%s' from"+ |
| " recovered to staging folder", dirName) |
| } |
| } |
| |
| func recoverFile(bucketRecoveryTS, dirName, fileName string) { |
| log.Debugf("performing crash recovery for file: %s ", fileName) |
| // add recovery timestamp to the file name |
| completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName) |
| |
| recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension |
| recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension |
| // eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt |
| recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName) |
| |
| // Copy complete records to new file and delete original partial file |
| copyPartialFile(completeOrigFilePath, recoveredFilePath) |
| deletePartialFile(completeOrigFilePath) |
| } |
| |
| // The file is read line by line and all complete records are extracted |
| // and copied to a new file which is closed as a correct gzip file. |
| func copyPartialFile(completeOrigFilePath, recoveredFilePath string) { |
| partialFile, err := os.Open(completeOrigFilePath) |
| if err != nil { |
| log.Errorf("Cannot open file: %s", completeOrigFilePath) |
| return |
| } |
| defer partialFile.Close() |
| |
| bufReader := bufio.NewReader(partialFile) |
| gzReader, err := gzip.NewReader(bufReader) |
| if err != nil { |
| log.Errorf("Cannot create reader on gzip file: %s"+ |
| " due to %v", completeOrigFilePath, err) |
| return |
| } |
| defer gzReader.Close() |
| |
| scanner := bufio.NewScanner(gzReader) |
| |
| // Create new file to copy complete records from partial file and upload only a complete file |
| recoveredFile, err := os.OpenFile(recoveredFilePath, |
| os.O_WRONLY|os.O_CREATE, os.ModePerm) |
| if err != nil { |
| log.Errorf("Cannot create recovered file: %s", recoveredFilePath) |
| return |
| } |
| defer recoveredFile.Close() |
| |
| gzWriter := gzip.NewWriter(recoveredFile) |
| defer gzWriter.Close() |
| |
| bufWriter := bufio.NewWriter(gzWriter) |
| defer bufWriter.Flush() |
| |
| for scanner.Scan() { |
| bufWriter.Write(scanner.Bytes()) |
| bufWriter.WriteString("\n") |
| } |
| if err := scanner.Err(); err != nil { |
| log.Warnf("Error while scanning partial file: %v", err) |
| return |
| } |
| } |
| |
| func deletePartialFile(completeOrigFilePath string) { |
| err := os.Remove(completeOrigFilePath) |
| if err != nil { |
| log.Errorf("Cannot delete partial file: %s", completeOrigFilePath) |
| } |
| } |