[XAPID-854] Listen to Apid shutdown, register callback and do a clean shutdown by closing open files
diff --git a/buffering_manager.go b/buffering_manager.go
index 7e28d4b..6fb156d 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -9,6 +9,7 @@
"os"
"path/filepath"
"time"
+ "sync"
)
const fileExtension = ".txt.gz"
@@ -24,7 +25,12 @@
// Map from timestampt to bucket
var bucketMap map[int64]bucket
+// RW lock for bucketMap since the cache can be
+// read while its being written to and vice versa
+var bucketMaplock = sync.RWMutex{}
+
type bucket struct {
+ keyTS int64
DirName string
// We need file handle and writer to close the file
FileWriter fileWriter
@@ -41,7 +47,11 @@
internalBuffer = make(chan axRecords,
config.GetInt(analyticsBufferChannelSize))
closeBucketEvent = make(chan bucket)
+
+ bucketMaplock.Lock()
bucketMap = make(map[int64]bucket)
+ bucketMaplock.Unlock()
+
// Keep polling the internal buffer for new messages
go func() {
@@ -71,9 +81,14 @@
// staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from"+
- " tmp to staging folder", bucket.DirName)
+ log.Errorf("Cannot move directory '%s' from" +
+ " tmp to staging folder due to '%s", bucket.DirName, err)
}
+
+ // Remove bucket from bucket map once its closed successfully
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
}
}()
}
@@ -92,9 +107,13 @@
// first based on current timestamp and collection interval,
// determine the timestamp of the bucket
ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
- _, exists := bucketMap[ts]
+
+ bucketMaplock.RLock()
+ b, exists := bucketMap[ts]
+ bucketMaplock.RUnlock()
+
if exists {
- return bucketMap[ts], nil
+ return b, nil
} else {
timestamp := time.Unix(ts, 0).Format(timestampLayout)
@@ -123,8 +142,11 @@
return bucket{}, err
}
- newBucket := bucket{DirName: dirName, FileWriter: fw}
+ newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw}
+
+ bucketMaplock.Lock()
bucketMap[ts] = newBucket
+ bucketMaplock.Unlock()
//Send event to close directory after endTime + 5
// seconds to make sure all buffers are flushed to file
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index fa4ad85..c3c747e 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -118,7 +118,7 @@
fw, e := createGzipFile(completeFilePath)
Expect(e).ShouldNot(HaveOccurred())
- bucket := bucket{DirName: dirName, FileWriter: fw}
+ bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw}
closeBucketEvent <- bucket
// wait for it to close dir and move to staging
diff --git a/init.go b/init.go
index 0706e57..386aa1e 100644
--- a/init.go
+++ b/init.go
@@ -143,6 +143,15 @@
}()
}
+ // Create a listener for shutdown event and register callback
+ h := func(event apid.Event) {
+ log.Infof("Received ApidShutdown event. %v", event)
+ shutdownPlugin();
+ return
+ }
+ log.Infof("registered listener for shutdown event")
+ events.ListenOnceFunc(apid.ShutdownEventSelector, h)
+
// Initialize API's and expose them
initAPI(services)
log.Debug("end init for apidAnalytics plugin")
@@ -200,3 +209,29 @@
}
return nil
}
+
+func shutdownPlugin() {
+ log.Info("Shutting down apidAnalytics plugin")
+
+ // Close all open files and move directories in tmp to staging.
+ bucketMaplock.RLock()
+ defer bucketMaplock.RUnlock()
+ for _, bucket := range bucketMap {
+ log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName)
+ closeGzipFile(bucket.FileWriter)
+
+ dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+ stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
+ err := os.Rename(dirToBeClosed, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from"+
+ " tmp to staging folder due to '%s", bucket.DirName, err)
+ }
+
+ bucketMaplock.Lock()
+ delete(bucketMap, bucket.keyTS)
+ bucketMaplock.Unlock()
+ }
+}
\ No newline at end of file
diff --git a/upload_manager.go b/upload_manager.go
index 4deba03..7373daf 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -47,6 +47,8 @@
handleUploadDirStatus(file, status)
if status {
uploadedDirCnt++
+ log.Infof("Successfully uploaded: %s",
+ file.Name())
}
}
}