Merge pull request #11 from 30x/pooja_xapid_854
[XAPID-854] Listen to Apid shutdown, register callback and do a clean shutdown by closing open files
diff --git a/buffering_manager.go b/buffering_manager.go
index 7e28d4b..184b1d6 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -8,6 +8,7 @@
"fmt"
"os"
"path/filepath"
+ "sync"
"time"
)
@@ -16,15 +17,24 @@
// Channel where analytics records are buffered before being dumped to a
// file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
+// channel to indicate that internalBuffer channel is closed
+var doneInternalBufferChan chan bool
// Channel where close bucket event is published i.e. when a bucket
// is ready to be closed based on collection interval
var closeBucketEvent chan bucket
+// channel to indicate that closeBucketEvent channel is closed
+var doneClosebucketChan chan bool
// Map from timestampt to bucket
var bucketMap map[int64]bucket
+// RW lock for bucketMap since the cache can be
+// read while its being written to and vice versa
+var bucketMaplock = sync.RWMutex{}
+
type bucket struct {
+ keyTS int64
DirName string
// We need file handle and writer to close the file
FileWriter fileWriter
@@ -41,24 +51,30 @@
internalBuffer = make(chan axRecords,
config.GetInt(analyticsBufferChannelSize))
closeBucketEvent = make(chan bucket)
+ doneInternalBufferChan = make(chan bool)
+ doneClosebucketChan = make(chan bool)
+
+ bucketMaplock.Lock()
bucketMap = make(map[int64]bucket)
+ bucketMaplock.Unlock()
// Keep polling the internal buffer for new messages
go func() {
- for {
- records := <-internalBuffer
+ for records := range internalBuffer {
err := save(records)
if err != nil {
log.Errorf("Could not save %d messages to file"+
" due to: %v", len(records.Records), err)
}
}
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel internal buffer")
+ doneInternalBufferChan <- true
}()
// Keep polling the closeEvent channel to see if bucket is ready to be closed
go func() {
- for {
- bucket := <-closeBucketEvent
+ for bucket := range closeBucketEvent {
log.Debugf("Close Event received for bucket: %s",
bucket.DirName)
@@ -71,10 +87,18 @@
// staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from"+
- " tmp to staging folder", bucket.DirName)
+ log.Errorf("Cannot move directory '%s' from" +
+ " tmp to staging folder due to '%s", bucket.DirName, err)
+ } else {
+ // Remove bucket from bucket map once its closed successfully
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
}
}
+ // indicates a close signal was sent on the channel
+ log.Debugf("Closing channel close bucketevent")
+ doneClosebucketChan <- true
}()
}
@@ -92,9 +116,13 @@
// first based on current timestamp and collection interval,
// determine the timestamp of the bucket
ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
- _, exists := bucketMap[ts]
+
+ bucketMaplock.RLock()
+ b, exists := bucketMap[ts]
+ bucketMaplock.RUnlock()
+
if exists {
- return bucketMap[ts], nil
+ return b, nil
} else {
timestamp := time.Unix(ts, 0).Format(timestampLayout)
@@ -123,8 +151,11 @@
return bucket{}, err
}
- newBucket := bucket{DirName: dirName, FileWriter: fw}
+ newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw}
+
+ bucketMaplock.Lock()
bucketMap[ts] = newBucket
+ bucketMaplock.Unlock()
//Send event to close directory after endTime + 5
// seconds to make sure all buffers are flushed to file
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index fa4ad85..c3c747e 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -118,7 +118,7 @@
fw, e := createGzipFile(completeFilePath)
Expect(e).ShouldNot(HaveOccurred())
- bucket := bucket{DirName: dirName, FileWriter: fw}
+ bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw}
closeBucketEvent <- bucket
// wait for it to close dir and move to staging
diff --git a/init.go b/init.go
index 0706e57..f865bc5 100644
--- a/init.go
+++ b/init.go
@@ -143,6 +143,15 @@
}()
}
+ // Create a listener for shutdown event and register callback
+ h := func(event apid.Event) {
+ log.Infof("Received ApidShutdown event. %v", event)
+ shutdownPlugin()
+ return
+ }
+ log.Infof("registered listener for shutdown event")
+ events.ListenOnceFunc(apid.ShutdownEventSelector, h)
+
// Initialize API's and expose them
initAPI(services)
log.Debug("end init for apidAnalytics plugin")
@@ -200,3 +209,46 @@
}
return nil
}
+
+func shutdownPlugin() {
+ log.Info("Shutting down apidAnalytics plugin")
+
+ // close channel so new records cannot be inserted
+ close(internalBuffer)
+ log.Debugf("sent signal to close internal buffer channel")
+
+ // close channel so new events for closing bucket cannot be posted
+ close(closeBucketEvent)
+ log.Debugf("sent signal to close closebucketevent channel")
+
+ // block on channel to ensure channel is closed
+ <- doneInternalBufferChan
+ log.Debugf("closed internal buffer channel successfully")
+
+ // block on channel to ensure channel is closed
+ <- doneClosebucketChan
+ log.Debugf("closed closebucketevent channel successfully")
+
+ // Close all open files and move directories in tmp to staging.
+ bucketMaplock.RLock()
+ for _, bucket := range bucketMap {
+ log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName)
+ closeGzipFile(bucket.FileWriter)
+
+ dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+ stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
+ err := os.Rename(dirToBeClosed, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from"+
+ " tmp to staging folder due to '%s", bucket.DirName, err)
+ }
+ }
+ bucketMaplock.RUnlock()
+
+ // Reset the map after all files are closed
+ bucketMaplock.Lock()
+ bucketMap = nil
+ bucketMaplock.Unlock()
+}
diff --git a/upload_manager.go b/upload_manager.go
index 4deba03..57e2f8e 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -47,6 +47,8 @@
handleUploadDirStatus(file, status)
if status {
uploadedDirCnt++
+ log.Debugf("Successfully uploaded: %s",
+ file.Name())
}
}
}