[UAP-854] Fixed datarace issue and deadlock issue addressed in comments
diff --git a/buffering_manager.go b/buffering_manager.go index b52c89c..151d6ec 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -17,10 +17,14 @@ // Channel where analytics records are buffered before being dumped to a // file as write to file should not performed in the Http Thread var internalBuffer chan axRecords +// channel to indicate that internalBuffer channel is closed +var doneInternalBufferChan chan bool // Channel where close bucket event is published i.e. when a bucket // is ready to be closed based on collection interval var closeBucketEvent chan bucket +// channel to indicate that closeBucketEvent channel is closed +var doneClosebucketChan chan bool // Map from timestampt to bucket var bucketMap map[int64]bucket @@ -47,6 +51,8 @@ internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize)) closeBucketEvent = make(chan bucket) + doneInternalBufferChan = make(chan bool) + doneClosebucketChan = make(chan bool) bucketMaplock.Lock() bucketMap = make(map[int64]bucket) @@ -55,38 +61,53 @@ // Keep polling the internal buffer for new messages go func() { for { - records := <-internalBuffer - err := save(records) - if err != nil { - log.Errorf("Could not save %d messages to file"+ - " due to: %v", len(records.Records), err) + records, more := <-internalBuffer + if more { + err := save(records) + if err != nil { + log.Errorf("Could not save %d messages to file"+ + " due to: %v", len(records.Records), err) + } + } else { + // indicates a close signal was sent on the channel + log.Debugf("Closing channel internal buffer") + doneInternalBufferChan <- true + return } + } }() // Keep polling the closeEvent channel to see if bucket is ready to be closed go func() { for { - bucket := <-closeBucketEvent - log.Debugf("Close Event received for bucket: %s", - bucket.DirName) + bucket, more := <-closeBucketEvent + if more { + log.Debugf("Close Event received for bucket: %s", + bucket.DirName) - // close open file - closeGzipFile(bucket.FileWriter) + // close open file + closeGzipFile(bucket.FileWriter) - dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) - stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) - // close files in tmp folder and move directory to - // staging to indicate its ready for upload - err := os.Rename(dirToBeClosed, stagingPath) - if err != nil { - log.Errorf("Cannot move directory '%s' from"+ - " tmp to staging folder due to '%s", bucket.DirName, err) + dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) + stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) + // close files in tmp folder and move directory to + // staging to indicate its ready for upload + err := os.Rename(dirToBeClosed, stagingPath) + if err != nil { + log.Errorf("Cannot move directory '%s' from"+ + " tmp to staging folder due to '%s", bucket.DirName, err) + } else { + // Remove bucket from bucket map once its closed successfully + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() + } } else { - // Remove bucket from bucket map once its closed successfully - bucketMaplock.Lock() - delete(bucketMap, bucket.keyTS) - bucketMaplock.Unlock() + // indicates a close signal was sent on the channel + log.Debugf("Closing channel close bucketevent") + doneClosebucketChan <- true + return } } }()
diff --git a/init.go b/init.go index c8ac1f0..f865bc5 100644 --- a/init.go +++ b/init.go
@@ -213,9 +213,24 @@ func shutdownPlugin() { log.Info("Shutting down apidAnalytics plugin") + // close channel so new records cannot be inserted + close(internalBuffer) + log.Debugf("sent signal to close internal buffer channel") + + // close channel so new events for closing bucket cannot be posted + close(closeBucketEvent) + log.Debugf("sent signal to close closebucketevent channel") + + // block on channel to ensure channel is closed + <- doneInternalBufferChan + log.Debugf("closed internal buffer channel successfully") + + // block on channel to ensure channel is closed + <- doneClosebucketChan + log.Debugf("closed closebucketevent channel successfully") + // Close all open files and move directories in tmp to staging. bucketMaplock.RLock() - defer bucketMaplock.RUnlock() for _, bucket := range bucketMap { log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName) closeGzipFile(bucket.FileWriter) @@ -228,10 +243,12 @@ if err != nil { log.Errorf("Cannot move directory '%s' from"+ " tmp to staging folder due to '%s", bucket.DirName, err) - } else { - bucketMaplock.Lock() - delete(bucketMap, bucket.keyTS) - bucketMaplock.Unlock() } } + bucketMaplock.RUnlock() + + // Reset the map after all files are closed + bucketMaplock.Lock() + bucketMap = nil + bucketMaplock.Unlock() }
diff --git a/upload_manager.go b/upload_manager.go index 7373daf..57e2f8e 100644 --- a/upload_manager.go +++ b/upload_manager.go
@@ -47,7 +47,7 @@ handleUploadDirStatus(file, status) if status { uploadedDirCnt++ - log.Infof("Successfully uploaded: %s", + log.Debugf("Successfully uploaded: %s", file.Name()) } }