Merge pull request #11 from 30x/pooja_xapid_854

[XAPID-854] Listen to Apid shutdown, register callback and do a clean shutdown by closing open files
diff --git a/buffering_manager.go b/buffering_manager.go
index 7e28d4b..184b1d6 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -8,6 +8,7 @@
 	"fmt"
 	"os"
 	"path/filepath"
+	"sync"
 	"time"
 )
 
@@ -16,15 +17,24 @@
 // Channel where analytics records are buffered before being dumped to a
 // file as write to file should not performed in the Http Thread
 var internalBuffer chan axRecords
+// channel to indicate that internalBuffer channel is closed
+var doneInternalBufferChan chan bool
 
 // Channel where close bucket event is published i.e. when a bucket
 // is ready to be closed based on collection interval
 var closeBucketEvent chan bucket
+// channel to indicate that closeBucketEvent channel is closed
+var doneClosebucketChan chan bool
 
 // Map from timestampt to bucket
 var bucketMap map[int64]bucket
 
+// RW lock for bucketMap  since the cache can be
+// read while its being written to and vice versa
+var bucketMaplock = sync.RWMutex{}
+
 type bucket struct {
+	keyTS   int64
 	DirName string
 	// We need file handle and writer to close the file
 	FileWriter fileWriter
@@ -41,24 +51,30 @@
 	internalBuffer = make(chan axRecords,
 		config.GetInt(analyticsBufferChannelSize))
 	closeBucketEvent = make(chan bucket)
+	doneInternalBufferChan = make(chan bool)
+	doneClosebucketChan = make(chan bool)
+
+	bucketMaplock.Lock()
 	bucketMap = make(map[int64]bucket)
+	bucketMaplock.Unlock()
 
 	// Keep polling the internal buffer for new messages
 	go func() {
-		for {
-			records := <-internalBuffer
+		for records := range internalBuffer {
 			err := save(records)
 			if err != nil {
 				log.Errorf("Could not save %d messages to file"+
 					" due to: %v", len(records.Records), err)
 			}
 		}
+		// indicates a close signal was sent on the channel
+		log.Debugf("Closing channel internal buffer")
+		doneInternalBufferChan <- true
 	}()
 
 	// Keep polling the closeEvent channel to see if bucket is ready to be closed
 	go func() {
-		for {
-			bucket := <-closeBucketEvent
+		for bucket := range closeBucketEvent {
 			log.Debugf("Close Event received for bucket: %s",
 				bucket.DirName)
 
@@ -71,10 +87,18 @@
 			// staging to indicate its ready for upload
 			err := os.Rename(dirToBeClosed, stagingPath)
 			if err != nil {
-				log.Errorf("Cannot move directory '%s' from"+
-					" tmp to staging folder", bucket.DirName)
+				log.Errorf("Cannot move directory '%s' from" +
+					" tmp to staging folder due to '%s", bucket.DirName, err)
+			} else {
+				// Remove bucket from bucket map once its closed successfully
+				bucketMaplock.Lock()
+				delete(bucketMap, bucket.keyTS)
+				bucketMaplock.Unlock()
 			}
 		}
+		// indicates a close signal was sent on the channel
+		log.Debugf("Closing channel close bucketevent")
+		doneClosebucketChan <- true
 	}()
 }
 
@@ -92,9 +116,13 @@
 	// first based on current timestamp and collection interval,
 	// determine the timestamp of the bucket
 	ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
-	_, exists := bucketMap[ts]
+
+	bucketMaplock.RLock()
+	b, exists := bucketMap[ts]
+	bucketMaplock.RUnlock()
+
 	if exists {
-		return bucketMap[ts], nil
+		return b, nil
 	} else {
 		timestamp := time.Unix(ts, 0).Format(timestampLayout)
 
@@ -123,8 +151,11 @@
 			return bucket{}, err
 		}
 
-		newBucket := bucket{DirName: dirName, FileWriter: fw}
+		newBucket := bucket{keyTS: ts, DirName: dirName, FileWriter: fw}
+
+		bucketMaplock.Lock()
 		bucketMap[ts] = newBucket
+		bucketMaplock.Unlock()
 
 		//Send event to close directory after endTime + 5
 		// seconds to make sure all buffers are flushed to file
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index fa4ad85..c3c747e 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -118,7 +118,7 @@
 			fw, e := createGzipFile(completeFilePath)
 			Expect(e).ShouldNot(HaveOccurred())
 
-			bucket := bucket{DirName: dirName, FileWriter: fw}
+			bucket := bucket{keyTS: 112312, DirName: dirName, FileWriter: fw}
 			closeBucketEvent <- bucket
 
 			// wait for it to close dir and move to staging
diff --git a/init.go b/init.go
index 0706e57..f865bc5 100644
--- a/init.go
+++ b/init.go
@@ -143,6 +143,15 @@
 		}()
 	}
 
+	// Create a listener for shutdown event and register callback
+	h := func(event apid.Event) {
+		log.Infof("Received ApidShutdown event. %v", event)
+		shutdownPlugin()
+		return
+	}
+	log.Infof("registered listener for shutdown event")
+	events.ListenOnceFunc(apid.ShutdownEventSelector, h)
+
 	// Initialize API's and expose them
 	initAPI(services)
 	log.Debug("end init for apidAnalytics plugin")
@@ -200,3 +209,46 @@
 	}
 	return nil
 }
+
+func shutdownPlugin() {
+	log.Info("Shutting down apidAnalytics plugin")
+
+	// close channel so new records cannot be inserted
+	close(internalBuffer)
+	log.Debugf("sent signal to close internal buffer channel")
+
+	// close channel so new events for closing bucket cannot be posted
+	close(closeBucketEvent)
+	log.Debugf("sent signal to close closebucketevent channel")
+
+	// block on channel to ensure channel is closed
+	<- doneInternalBufferChan
+	log.Debugf("closed internal buffer channel successfully")
+
+	// block on channel to ensure channel is closed
+	<- doneClosebucketChan
+	log.Debugf("closed closebucketevent channel successfully")
+
+	// Close all open files and move directories in tmp to staging.
+	bucketMaplock.RLock()
+	for _, bucket := range bucketMap {
+		log.Infof("closing bucket '%s' as a part of shutdown", bucket.DirName)
+		closeGzipFile(bucket.FileWriter)
+
+		dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+		stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+		// close files in tmp folder and move directory to
+		// staging to indicate its ready for upload
+		err := os.Rename(dirToBeClosed, stagingPath)
+		if err != nil {
+			log.Errorf("Cannot move directory '%s' from"+
+				" tmp to staging folder due to '%s", bucket.DirName, err)
+		}
+	}
+	bucketMaplock.RUnlock()
+
+	// Reset the map after all files are closed
+	bucketMaplock.Lock()
+	bucketMap = nil
+	bucketMaplock.Unlock()
+}
diff --git a/upload_manager.go b/upload_manager.go
index 4deba03..57e2f8e 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -47,6 +47,8 @@
 					handleUploadDirStatus(file, status)
 					if status {
 						uploadedDirCnt++
+						log.Debugf("Successfully uploaded: %s",
+							file.Name())
 					}
 				}
 			}