[UAP-854] Cleaned up syntax for reading on a channel until close is received
diff --git a/buffering_manager.go b/buffering_manager.go index 151d6ec..184b1d6 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -60,56 +60,45 @@ // Keep polling the internal buffer for new messages go func() { - for { - records, more := <-internalBuffer - if more { - err := save(records) - if err != nil { - log.Errorf("Could not save %d messages to file"+ - " due to: %v", len(records.Records), err) - } - } else { - // indicates a close signal was sent on the channel - log.Debugf("Closing channel internal buffer") - doneInternalBufferChan <- true - return + for records := range internalBuffer { + err := save(records) + if err != nil { + log.Errorf("Could not save %d messages to file"+ + " due to: %v", len(records.Records), err) } - } + // indicates a close signal was sent on the channel + log.Debugf("Closing channel internal buffer") + doneInternalBufferChan <- true }() // Keep polling the closeEvent channel to see if bucket is ready to be closed go func() { - for { - bucket, more := <-closeBucketEvent - if more { - log.Debugf("Close Event received for bucket: %s", - bucket.DirName) + for bucket := range closeBucketEvent { + log.Debugf("Close Event received for bucket: %s", + bucket.DirName) - // close open file - closeGzipFile(bucket.FileWriter) + // close open file + closeGzipFile(bucket.FileWriter) - dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) - stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) - // close files in tmp folder and move directory to - // staging to indicate its ready for upload - err := os.Rename(dirToBeClosed, stagingPath) - if err != nil { - log.Errorf("Cannot move directory '%s' from"+ - " tmp to staging folder due to '%s", bucket.DirName, err) - } else { - // Remove bucket from bucket map once its closed successfully - bucketMaplock.Lock() - delete(bucketMap, bucket.keyTS) - bucketMaplock.Unlock() - } + dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) + stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) + // close files in tmp folder and move directory to + // staging to indicate its ready for upload + err := os.Rename(dirToBeClosed, stagingPath) + if err != nil { + log.Errorf("Cannot move directory '%s' from" + + " tmp to staging folder due to '%s", bucket.DirName, err) } else { - // indicates a close signal was sent on the channel - log.Debugf("Closing channel close bucketevent") - doneClosebucketChan <- true - return + // Remove bucket from bucket map once its closed successfully + bucketMaplock.Lock() + delete(bucketMap, bucket.keyTS) + bucketMaplock.Unlock() } } + // indicates a close signal was sent on the channel + log.Debugf("Closing channel close bucketevent") + doneClosebucketChan <- true }() }