[XAPID-854] Listen to Apid shutdown, register callback and do a clean shutdown by closing open files
diff --git a/buffering_manager.go b/buffering_manager.go index 7e28d4b..6fb156d 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -9,6 +9,7 @@ "os" "path/filepath" "time" + "sync" ) const fileExtension = ".txt.gz" @@ -24,7 +25,12 @@ // Map from timestampt to bucket var bucketMap map[int64]bucket +// RW lock for bucketMap since the cache can be +// read while its being written to and vice versa +var bucketMaplock = sync.RWMutex{} + type bucket struct { + keyTS int64 DirName string // We need file handle and writer to close the file FileWriter fileWriter @@ -41,7 +47,11 @@ internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize)) closeBucketEvent = make(chan bucket) + + bucketMaplock.Lock() bucketMap = make(map[int64]bucket) + bucketMaplock.Unlock() + // Keep polling the internal buffer for new messages go func() { @@ -71,9 +81,14 @@ // staging to indicate its ready for upload err := os.Rename(dirToBeClosed, stagingPath) if err != nil { - log.Errorf("Cannot move directory '%s' from"+ - " tmp to staging folder", bucket.DirName) + log.Errorf("Cannot move directory '%s' from" + + " tmp to staging folder due to '%s", bucket.DirName, err) } + + // Remove bucket from bucket map once its closed successfully + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() } }() } @@ -92,9 +107,13 @@ // first based on current timestamp and collection interval, // determine the timestamp of the bucket ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) - _, exists := bucketMap[ts] + + bucketMaplock.RLock() + b, exists := bucketMap[ts] + bucketMaplock.RUnlock() + if exists { - return bucketMap[ts], nil + return b, nil } else { timestamp := time.Unix(ts, 0).Format(timestampLayout) @@ -123,8 +142,11 @@ return bucket{}, err } - newBucket := bucket{DirName: dirName, FileWriter: fw} + newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw} + + bucketMaplock.Lock() bucketMap[ts] = newBucket + bucketMaplock.Unlock() //Send event to close directory after endTime + 5 // seconds to make sure all buffers are flushed to file
diff --git a/buffering_manager_test.go b/buffering_manager_test.go index fa4ad85..c3c747e 100644 --- a/buffering_manager_test.go +++ b/buffering_manager_test.go
@@ -118,7 +118,7 @@ fw, e := createGzipFile(completeFilePath) Expect(e).ShouldNot(HaveOccurred()) - bucket := bucket{DirName: dirName, FileWriter: fw} + bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw} closeBucketEvent <- bucket // wait for it to close dir and move to staging
diff --git a/init.go b/init.go index 0706e57..386aa1e 100644 --- a/init.go +++ b/init.go
@@ -143,6 +143,15 @@ }() } + // Create a listener for shutdown event and register callback + h := func(event apid.Event) { + log.Infof("Received ApidShutdown event. %v", event) + shutdownPlugin(); + return + } + log.Infof("registered listener for shutdown event") + events.ListenOnceFunc(apid.ShutdownEventSelector, h) + // Initialize API's and expose them initAPI(services) log.Debug("end init for apidAnalytics plugin") @@ -200,3 +209,29 @@ } return nil } + +func shutdownPlugin() { + log.Info("Shutting down apidAnalytics plugin") + + // Close all open files and move directories in tmp to staging. + bucketMaplock.RLock() + defer bucketMaplock.RUnlock() + for _, bucket := range bucketMap { + log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName) + closeGzipFile(bucket.FileWriter) + + dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) + stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) + // close files in tmp folder and move directory to + // staging to indicate its ready for upload + err := os.Rename(dirToBeClosed, stagingPath) + if err != nil { + log.Errorf("Cannot move directory '%s' from"+ + " tmp to staging folder due to '%s", bucket.DirName, err) + } + + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() + } +} \ No newline at end of file
diff --git a/upload_manager.go b/upload_manager.go index 4deba03..7373daf 100644 --- a/upload_manager.go +++ b/upload_manager.go
@@ -47,6 +47,8 @@ handleUploadDirStatus(file, status) if status { uploadedDirCnt++ + log.Infof("Successfully uploaded: %s", + file.Name()) } } }