[XAPID-854] Listen to Apid shutdown, register callback and do a clean shutdown by closing open files
diff --git a/buffering_manager.go b/buffering_manager.go
index 7e28d4b..6fb156d 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -9,6 +9,7 @@
 	"os"
 	"path/filepath"
 	"time"
+	"sync"
 )
 
 const fileExtension = ".txt.gz"
@@ -24,7 +25,12 @@
 // Map from timestampt to bucket
 var bucketMap map[int64]bucket
 
+// RW lock for bucketMap  since the cache can be
+// read while its being written to and vice versa
+var bucketMaplock = sync.RWMutex{}
+
 type bucket struct {
+	keyTS int64
 	DirName string
 	// We need file handle and writer to close the file
 	FileWriter fileWriter
@@ -41,7 +47,11 @@
 	internalBuffer = make(chan axRecords,
 		config.GetInt(analyticsBufferChannelSize))
 	closeBucketEvent = make(chan bucket)
+
+	bucketMaplock.Lock()
 	bucketMap = make(map[int64]bucket)
+	bucketMaplock.Unlock()
+
 
 	// Keep polling the internal buffer for new messages
 	go func() {
@@ -71,9 +81,14 @@
 			// staging to indicate its ready for upload
 			err := os.Rename(dirToBeClosed, stagingPath)
 			if err != nil {
-				log.Errorf("Cannot move directory '%s' from"+
-					" tmp to staging folder", bucket.DirName)
+				log.Errorf("Cannot move directory '%s' from" +
+					" tmp to staging folder due to '%s", bucket.DirName, err)
 			}
+
+			// Remove bucket from bucket map once its closed successfully
+			bucketMaplock.Lock()
+			delete(bucketMap, bucket.keyTS)
+			bucketMaplock.Unlock()
 		}
 	}()
 }
@@ -92,9 +107,13 @@
 	// first based on current timestamp and collection interval,
 	// determine the timestamp of the bucket
 	ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
-	_, exists := bucketMap[ts]
+
+	bucketMaplock.RLock()
+	b, exists := bucketMap[ts]
+	bucketMaplock.RUnlock()
+
 	if exists {
-		return bucketMap[ts], nil
+		return b, nil
 	} else {
 		timestamp := time.Unix(ts, 0).Format(timestampLayout)
 
@@ -123,8 +142,11 @@
 			return bucket{}, err
 		}
 
-		newBucket := bucket{DirName: dirName, FileWriter: fw}
+		newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw}
+
+		bucketMaplock.Lock()
 		bucketMap[ts] = newBucket
+		bucketMaplock.Unlock()
 
 		//Send event to close directory after endTime + 5
 		// seconds to make sure all buffers are flushed to file
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index fa4ad85..c3c747e 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -118,7 +118,7 @@
 			fw, e := createGzipFile(completeFilePath)
 			Expect(e).ShouldNot(HaveOccurred())
 
-			bucket := bucket{DirName: dirName, FileWriter: fw}
+			bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw}
 			closeBucketEvent <- bucket
 
 			// wait for it to close dir and move to staging
diff --git a/init.go b/init.go
index 0706e57..386aa1e 100644
--- a/init.go
+++ b/init.go
@@ -143,6 +143,15 @@
 		}()
 	}
 
+	// Create a listener for shutdown event and register callback
+	h := func(event apid.Event) {
+		log.Infof("Received ApidShutdown event. %v", event)
+		shutdownPlugin();
+		return
+	}
+	log.Infof("registered listener for shutdown event")
+	events.ListenOnceFunc(apid.ShutdownEventSelector, h)
+
 	// Initialize API's and expose them
 	initAPI(services)
 	log.Debug("end init for apidAnalytics plugin")
@@ -200,3 +209,29 @@
 	}
 	return nil
 }
+
+func shutdownPlugin() {
+	log.Info("Shutting down apidAnalytics plugin")
+
+	// Close all open files and move directories in tmp to staging.
+	bucketMaplock.RLock()
+	defer bucketMaplock.RUnlock()
+	for _, bucket := range bucketMap {
+		log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName)
+		closeGzipFile(bucket.FileWriter)
+
+		dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+		stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+		// close files in tmp folder and move directory to
+		// staging to indicate its ready for upload
+		err := os.Rename(dirToBeClosed, stagingPath)
+		if err != nil {
+			log.Errorf("Cannot move directory '%s' from"+
+				" tmp to staging folder due to '%s", bucket.DirName, err)
+		}
+
+		bucketMaplock.Lock()
+		delete(bucketMap, bucket.keyTS)
+		bucketMaplock.Unlock()
+	}
+}
\ No newline at end of file
diff --git a/upload_manager.go b/upload_manager.go
index 4deba03..7373daf 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -47,6 +47,8 @@
 					handleUploadDirStatus(file, status)
 					if status {
 						uploadedDirCnt++
+						log.Infof("Successfully uploaded: %s",
+							file.Name())
 					}
 				}
 			}