[UAP-854] Fixed datarace issue and deadlock issue addressed in comments
diff --git a/buffering_manager.go b/buffering_manager.go
index b52c89c..151d6ec 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -17,10 +17,14 @@
// Channel where analytics records are buffered before being dumped to a
// file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
+// channel to indicate that internalBuffer channel is closed
+var doneInternalBufferChan chan bool
// Channel where close bucket event is published i.e. when a bucket
// is ready to be closed based on collection interval
var closeBucketEvent chan bucket
+// channel to indicate that closeBucketEvent channel is closed
+var doneClosebucketChan chan bool
// Map from timestampt to bucket
var bucketMap map[int64]bucket
@@ -47,6 +51,8 @@
internalBuffer = make(chan axRecords,
config.GetInt(analyticsBufferChannelSize))
closeBucketEvent = make(chan bucket)
+ doneInternalBufferChan = make(chan bool)
+ doneClosebucketChan = make(chan bool)
bucketMaplock.Lock()
bucketMap = make(map[int64]bucket)
@@ -55,38 +61,53 @@
// Keep polling the internal buffer for new messages
go func() {
for {
- records := <-internalBuffer
- err := save(records)
- if err != nil {
- log.Errorf("Could not save %d messages to file"+
- " due to: %v", len(records.Records), err)
+ records, more := <-internalBuffer
+ if more {
+ err := save(records)
+ if err != nil {
+ log.Errorf("Could not save %d messages to file"+
+ " due to: %v", len(records.Records), err)
+ }
+ } else {
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel internal buffer")
+ doneInternalBufferChan <- true
+ return
}
+
}
}()
// Keep polling the closeEvent channel to see if bucket is ready to be closed
go func() {
for {
- bucket := <-closeBucketEvent
- log.Debugf("Close Event received for bucket: %s",
- bucket.DirName)
+ bucket, more := <-closeBucketEvent
+ if more {
+ log.Debugf("Close Event received for bucket: %s",
+ bucket.DirName)
- // close open file
- closeGzipFile(bucket.FileWriter)
+ // close open file
+ closeGzipFile(bucket.FileWriter)
- dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
- stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
- // close files in tmp folder and move directory to
- // staging to indicate its ready for upload
- err := os.Rename(dirToBeClosed, stagingPath)
- if err != nil {
- log.Errorf("Cannot move directory '%s' from"+
- " tmp to staging folder due to '%s", bucket.DirName, err)
+ dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+ stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
+ err := os.Rename(dirToBeClosed, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from"+
+ " tmp to staging folder due to '%s", bucket.DirName, err)
+ } else {
+ // Remove bucket from bucket map once its closed successfully
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
+ }
} else {
- // Remove bucket from bucket map once its closed successfully
- bucketMaplock.Lock()
- delete(bucketMap, bucket.keyTS)
- bucketMaplock.Unlock()
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel close bucketevent")
+ doneClosebucketChan <- true
+ return
}
}
}()
diff --git a/init.go b/init.go
index c8ac1f0..f865bc5 100644
--- a/init.go
+++ b/init.go
@@ -213,9 +213,24 @@
func shutdownPlugin() {
log.Info("Shutting down apidAnalytics plugin")
+ // close channel so new records cannot be inserted
+ close(internalBuffer)
+ log.Debugf("sent signal to close internal buffer channel")
+
+ // close channel so new events for closing bucket cannot be posted
+ close(closeBucketEvent)
+ log.Debugf("sent signal to close closebucketevent channel")
+
+ // block on channel to ensure channel is closed
+ <- doneInternalBufferChan
+ log.Debugf("closed internal buffer channel successfully")
+
+ // block on channel to ensure channel is closed
+ <- doneClosebucketChan
+ log.Debugf("closed closebucketevent channel successfully")
+
// Close all open files and move directories in tmp to staging.
bucketMaplock.RLock()
- defer bucketMaplock.RUnlock()
for _, bucket := range bucketMap {
log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName)
closeGzipFile(bucket.FileWriter)
@@ -228,10 +243,12 @@
if err != nil {
log.Errorf("Cannot move directory '%s' from"+
" tmp to staging folder due to '%s", bucket.DirName, err)
- } else {
- bucketMaplock.Lock()
- delete(bucketMap, bucket.keyTS)
- bucketMaplock.Unlock()
}
}
+ bucketMaplock.RUnlock()
+
+ // Reset the map after all files are closed
+ bucketMaplock.Lock()
+ bucketMap = nil
+ bucketMaplock.Unlock()
}
diff --git a/upload_manager.go b/upload_manager.go
index 7373daf..57e2f8e 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -47,7 +47,7 @@
handleUploadDirStatus(file, status)
if status {
uploadedDirCnt++
- log.Infof("Successfully uploaded: %s",
+ log.Debugf("Successfully uploaded: %s",
file.Name())
}
}