[XAPID-377] Added logic to buffer messages to local files and then move to staging
diff --git a/api.yaml b/api.yaml
index 333bbc2..782de56 100644
--- a/api.yaml
+++ b/api.yaml
@@ -73,7 +73,7 @@
       	"client_received_end_timestamp": 1462850097580,
       	"client_sent_start_timestamp": 1462850097894,
         "request_path" : "/oauth/oauthv2/auth_code/",
-	      "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB",
+        "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB",
         "useragent" : "Chrome",
         "target" : "target_name",
         "target_received_end_timestamp": 1462850097800,
diff --git a/api_helper.go b/api_helper.go
index e89d1d9..214578d 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -4,7 +4,6 @@
 	"encoding/json"
 	"net/http"
 	"io"
-	"io/ioutil"
 	"strings"
 	"compress/gzip"
 )
@@ -17,6 +16,11 @@
 	Developer	string
 }
 
+type axRecords struct {
+	Tenant tenant
+	Records []interface{}
+}
+
 func processPayload(tenant tenant, scopeuuid string,  r *http.Request) errResponse {
 	var gzipEncoded bool
 	if r.Header.Get("Content-Encoding") != "" {
@@ -38,33 +42,35 @@
 		reader = r.Body
 	}
 
-	body, _ := ioutil.ReadAll(reader)
-	errMessage := validateEnrichPublish(tenant, scopeuuid,  body)
+	errMessage := validateEnrichPublish(tenant, scopeuuid, reader)
 	if errMessage.ErrorCode != "" {
 		return errMessage
 	}
 	return errResponse{}
 }
 
-func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse {
+func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse {
 	var raw map[string]interface{}
-	err := json.Unmarshal(body, &raw)
-	if err != nil {
+	dec := json.NewDecoder(reader)
+	dec.UseNumber()
+
+	if err := dec.Decode(&raw); err != nil {
 		return errResponse{"BAD_DATA", "Not a valid JSON payload"}
 	}
+
 	if records := raw["records"]; records != nil {
 		for _, eachRecord := range records.([]interface{}) {
 			recordMap := eachRecord.(map[string]interface{})
 			valid, err := validate(recordMap)
 			if valid {
 				enrich(recordMap, scopeuuid, tenant)
-				// TODO: Remove log
-				log.Debugf("Raw records : %v ", recordMap)
 			} else {
 				return err				// Even if there is one bad record, then reject entire batch
 			}
 		}
-		publishToChannel(records.([]interface{}))
+		// publish batch of records to channel (blocking call)
+		axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})}
+		internalBuffer <- axRecords
 	} else {
 		return errResponse{"NO_RECORDS", "No analytics records in the payload"}
 	}
@@ -82,7 +88,7 @@
 	crst, exists1 := recordMap["client_received_start_timestamp"]
 	cret, exists2 := recordMap["client_received_end_timestamp"]
 	if exists1 && exists2 {
-		if crst.(int64) > cret.(int64) {
+		if crst.(json.Number) > cret.(json.Number) {
 			return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"}
 		}
 	}
@@ -105,7 +111,6 @@
 	if exists {
 		apiKey := apiKey.(string)
 		devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
-		// TODO: Remove log
 		_, exists := recordMap["api_product"]
 		if !exists {
 			recordMap["api_product"] = devInfo.ApiProduct
@@ -125,12 +130,6 @@
 	}
 }
 
-func publishToChannel(records []interface{})  {
-	// TODO: add the batch of records to a channel for consumption
-	log.Debugf("records on channel : %v", records)
-	return
-}
-
 func writeError(w http.ResponseWriter, status int, code string, reason string) {
 	w.WriteHeader(status)
 	e := errResponse{
diff --git a/buffering_manager.go b/buffering_manager.go
new file mode 100644
index 0000000..3acb4c8
--- /dev/null
+++ b/buffering_manager.go
@@ -0,0 +1,153 @@
+package apidAnalytics
+
+import (
+	"time"
+	"os"
+	"bufio"
+	"compress/gzip"
+	"path/filepath"
+	"fmt"
+	"crypto/rand"
+	"encoding/base64"
+	"encoding/json"
+)
+
+var internalBuffer chan axRecords
+var closeBucketEvent chan bucket
+var bucketMap map[int64]bucket
+
+type bucket struct {
+	DirName string
+	// We need file handle, writter pointer to close the file
+	FileWriter fileWriter
+}
+
+// This struct will store open file handle and writer to close the file
+type fileWriter struct {
+	file  *os.File
+	gw *gzip.Writer
+	bw *bufio.Writer
+}
+
+func initBufferingManager() {
+	internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize))
+	closeBucketEvent = make(chan  bucket)
+	bucketMap = make(map[int64]bucket)
+
+	// Keep polling the internal buffer for new messages
+	go func() {
+		for  {
+			records := <-internalBuffer
+			err := save(records)
+			if err != nil {
+				log.Errorf("Could not save %d messages to file. %v", len(records.Records), err)
+			}
+		}
+	}()
+
+	// Keep polling the closeEvent channel to see if bucket is ready to be closed
+	go func() {
+		for  {
+			bucket := <- closeBucketEvent
+			log.Debugf("Closing bucket %s", bucket.DirName)
+
+			// close open file
+			closeGzipFile(bucket.FileWriter)
+
+			dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+			stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+			err := os.Rename(dirToBeClosed, stagingPath)
+			if err != nil {
+				log.Errorf("Cannot move directory :%s to staging folder", bucket.DirName)
+			}
+		}
+	}()
+}
+
+func save(records axRecords) (error) {
+	bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
+	if (err != nil ) {
+		return err
+	}
+	writeGzipFile(bucket.FileWriter, records.Records)
+	return nil
+}
+
+
+func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
+	// first based on current timestamp, determine the timestamp bucket
+	ts :=  now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
+	_, exists := bucketMap[ts]
+	if exists {
+		return bucketMap[ts], nil
+	} else {
+		timestamp := time.Unix(ts, 0).Format(timestampLayout)
+
+		endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0)
+		endtimestamp := endTime.Format(timestampLayout)
+
+		dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
+		newPath := filepath.Join(localAnalyticsTempDir, dirName)
+		// create dir
+		err := os.Mkdir(newPath, os.ModePerm)
+		if err != nil {
+			return bucket{}, fmt.Errorf("Cannot create directory : %s to buffer messages due to %v:", dirName, err)
+		}
+
+		// create file for writing
+		fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0.txt.gz"
+		completeFilePath := filepath.Join(newPath, fileName)
+		fw, err := createGzipFile(completeFilePath)
+		if err != nil {
+			return bucket{}, err
+		}
+
+		newBucket := bucket{DirName: dirName, FileWriter: fw}
+		bucketMap[ts] = newBucket
+
+		//Send event to close directory after endTime
+		timer := time.After(endTime.Sub(time.Now()) + time.Second * 5)
+		go func() {
+			<- timer
+			closeBucketEvent <- newBucket
+		}()
+		return newBucket, nil
+	}
+}
+
+//TODO: implement 4 digit hext method
+func getRandomHex() string {
+	buff := make([]byte, 2)
+	rand.Read(buff)
+	return base64.URLEncoding.EncodeToString(buff)
+}
+
+func createGzipFile(s string) (fileWriter, error) {
+	file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
+	if err != nil {
+		return fileWriter{},fmt.Errorf("Cannot create file : %s to buffer messages due to: %v", s, err)
+	}
+	gw := gzip.NewWriter(file)
+	bw := bufio.NewWriter(gw)
+	return fileWriter{file, gw, bw}, nil
+}
+
+func writeGzipFile(fw fileWriter, records []interface{}) {
+	for _, eachRecord := range records {
+		s, _ := json.Marshal(eachRecord)
+		_, err := (fw.bw).WriteString(string(s))
+		if err != nil {
+			log.Errorf("Write to file failed due to: %v", err)
+		}
+		(fw.bw).WriteString("\n")
+	}
+	fw.bw.Flush()
+}
+
+func closeGzipFile(fw fileWriter) {
+	fw.bw.Flush()
+	// Close the gzip first.
+	fw.gw.Close()
+	fw.file.Close()
+}
+
diff --git a/common_helper.go b/common_helper.go
index e64252a..85754ba 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -3,10 +3,14 @@
 import (
 	"database/sql"
 	"fmt"
+	"sync"
 )
 
 var tenantCache map[string]tenant
 var developerInfoCache map[string]developerInfo
+var tenantCachelock = sync.RWMutex{}
+var developerInfoCacheLock = sync.RWMutex{}
+
 
 func createTenantCache() error {
 	tenantCache = make(map[string]tenant)
@@ -22,12 +26,15 @@
 		return fmt.Errorf("Count not get datascope from DB due to : %s", error.Error())
 	} else  {
 		defer rows.Close()
+		// Lock before writing to the map as it has multiple readers
+		tenantCachelock.Lock()
+		defer tenantCachelock.Unlock()
 		for rows.Next() {
 			rows.Scan(&env, &org, &tenantId, &id);
 			tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId}
 		}
 	}
-	log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache))
+	log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
 	return nil
 }
 
@@ -50,6 +57,9 @@
 		return fmt.Errorf("Count not get developerInfo from DB due to : %s", error.Error())
 	} else {
 		defer rows.Close()
+		// Lock before writing to the map as it has multiple readers
+		developerInfoCacheLock.Lock()
+		defer developerInfoCacheLock.Unlock()
 		for rows.Next() {
 			rows.Scan(&tenantId,&apiKey,&apiProduct, &developerApp, &developer, &developerEmail)
 
@@ -74,6 +84,9 @@
 			errorCode := "UNKNOWN_SCOPE"
 			return tenant{}, dbError{errorCode, reason}
 		} else {
+			// acquire a read lock as this cache has 1 writer as well
+			tenantCache.RLock()
+			defer tenantCache.RUnlock()
 			return tenantCache[scopeuuid], dbError{}
 		}
 	} else {
@@ -100,7 +113,7 @@
 
 		return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{}
 	}
-	// TODO: local testing
+	//// TODO: local testing
 	//return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{}
 }
 
@@ -112,6 +125,9 @@
 			log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
 			return developerInfo{}
 		} else {
+			// acquire a read lock as this cache has 1 writer as well
+			developerInfoCacheLock.RLock()
+			defer developerInfoCacheLock.RUnlock()
 			return developerInfoCache[keyForMap]
 		}
 	} else {
@@ -123,8 +139,8 @@
 			"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
 			"INNER JOIN APP AS a ON a.id = mp.app_id " +
 			"INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
-			"where mp.tenant_id = \"" + tenantId + "\" and mp.appcred_id = \"" + apiKey + "\";"
-		error := db.QueryRow(sSql).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
+			"where mp.tenant_id = ? and mp.appcred_id = ?;"
+		error := db.QueryRow(sSql,tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
 
 		switch {
 		case error == sql.ErrNoRows:
diff --git a/crashRecovery.go b/crash_recovery.go
similarity index 98%
rename from crashRecovery.go
rename to crash_recovery.go
index 33a0711..971270d 100644
--- a/crashRecovery.go
+++ b/crash_recovery.go
@@ -19,9 +19,9 @@
 
 func initCrashRecovery() {
 	if crashRecoveryNeeded() {
-		timer := time.NewTimer(time.Second * crashRecoveryDelay)
+		timer := time.After(time.Second * crashRecoveryDelay)
 		go func() {
-			<- timer.C
+			<- timer
 			performRecovery()
 		}()
 	}
diff --git a/init.go b/init.go
index b8eaf81..f0161f3 100644
--- a/init.go
+++ b/init.go
@@ -25,6 +25,9 @@
 	analyticsUploadInterval        = "apidanalytics_upload_interval" // config in seconds
 	analyticsUploadIntervalDefault = "5"
 
+	analyticsBufferChannelSize  = "apidanalytics_buffer_channel_size"
+	analyticsBufferChannelSizeDefault = 10 // number of slots
+
 	uapServerBase = "apidanalytics_uap_server_base" // config
 
 	useCaching = "apidanalytics_use_caching"
@@ -80,7 +83,8 @@
 	}
 
 	// localTesting
-	config.SetDefault(uapServerBase,"http://localhost:9010")
+	//config.SetDefault(uapServerBase,"http://localhost:9010")
+	//config.SetDefault("apigeesync_apid_instance_id","fesgG-3525-SFAG")
 
 	for _, key := range []string{uapServerBase} {
 		if !config.IsSet(key) {
@@ -104,6 +108,8 @@
 
 	initUploadManager()
 
+	initBufferingManager()
+
 	initAPI(services)
 	log.Debug("end init for apidAnalytics plugin")
 	return pluginData, nil
@@ -137,6 +143,9 @@
 	// set default config for upload interval
 	config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
 
+	// set default config for internal buffer size
+	config.SetDefault(analyticsBufferChannelSize, analyticsBufferChannelSizeDefault)
+
 	return nil
 }
 
diff --git a/listener.go b/listener.go
index d7a8509..b20fb9b 100644
--- a/listener.go
+++ b/listener.go
@@ -68,6 +68,9 @@
 			switch payload.Operation {
 			case common.Insert, common.Update:
 				rows = append(rows, payload.NewRow)
+				// Lock before writing to the map as it has multiple readers
+				tenantCachelock.Lock()
+				defer tenantCachelock.Unlock()
 				for _, ele := range rows {
 					var scopeuuid, tenantid, org, env string
 					ele.Get("id", &scopeuuid)
@@ -78,6 +81,8 @@
 				}
 			case common.Delete:
 				rows = append(rows, payload.NewRow)
+				tenantCachelock.Lock()
+				defer tenantCachelock.Unlock()
 				for _, ele := range rows {
 					var scopeuuid string
 					ele.Get("id", &scopeuuid)
diff --git a/uploadManager.go b/upload_manager.go
similarity index 100%
rename from uploadManager.go
rename to upload_manager.go