[UAP-854] Cleaned up syntax for reading on a channel until close is received
diff --git a/buffering_manager.go b/buffering_manager.go
index 151d6ec..184b1d6 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -60,56 +60,45 @@
// Keep polling the internal buffer for new messages
go func() {
- for {
- records, more := <-internalBuffer
- if more {
- err := save(records)
- if err != nil {
- log.Errorf("Could not save %d messages to file"+
- " due to: %v", len(records.Records), err)
- }
- } else {
- // indicates a close signal was sent on the channel
- log.Debugf("Closing channel internal buffer")
- doneInternalBufferChan <- true
- return
+ for records := range internalBuffer {
+ err := save(records)
+ if err != nil {
+ log.Errorf("Could not save %d messages to file"+
+ " due to: %v", len(records.Records), err)
}
-
}
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel internal buffer")
+ doneInternalBufferChan <- true
}()
// Keep polling the closeEvent channel to see if bucket is ready to be closed
go func() {
- for {
- bucket, more := <-closeBucketEvent
- if more {
- log.Debugf("Close Event received for bucket: %s",
- bucket.DirName)
+ for bucket := range closeBucketEvent {
+ log.Debugf("Close Event received for bucket: %s",
+ bucket.DirName)
- // close open file
- closeGzipFile(bucket.FileWriter)
+ // close open file
+ closeGzipFile(bucket.FileWriter)
- dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
- stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
- // close files in tmp folder and move directory to
- // staging to indicate its ready for upload
- err := os.Rename(dirToBeClosed, stagingPath)
- if err != nil {
- log.Errorf("Cannot move directory '%s' from"+
- " tmp to staging folder due to '%s", bucket.DirName, err)
- } else {
- // Remove bucket from bucket map once its closed successfully
- bucketMaplock.Lock()
- delete(bucketMap, bucket.keyTS)
- bucketMaplock.Unlock()
- }
+ dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+ stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
+ err := os.Rename(dirToBeClosed, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from" +
+ " tmp to staging folder due to '%s", bucket.DirName, err)
} else {
- // indicates a close signal was sent on the channel
- log.Debugf("Closing channel close bucketevent")
- doneClosebucketChan <- true
- return
+ // Remove bucket from bucket map once its closed successfully
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
}
}
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel close bucketevent")
+ doneClosebucketChan <- true
}()
}