Merge pull request #2 from 30x/pooja_xapid_377

xapid 377
diff --git a/README b/README
deleted file mode 100644
index 09fec22..0000000
--- a/README
+++ /dev/null
@@ -1,3 +0,0 @@
-# apidApigeeAnalytics
-
-This core plugin for [apid](http://github.com/30x/apid) and resposible for collecting analytics data for runtime traffic and puplishing to Apigee.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0b41867
--- /dev/null
+++ b/README.md
@@ -0,0 +1,56 @@
+# ApidAnalytics
+
+This is a core plugin for [apid](http://github.com/30x/apid) and is responsible for collecting analytics data for
+runtime traffic from Micro and Enterprise Gateway and puplishing to Apigee.
+
+### Configuration
+
+| name                                  | description                       |
+|---------------------------------------|-----------------------------------|
+| apidanalytics_base_path               | string. default: /analytics       |
+| apidanalytics_data_path               | string. default: /ax              |
+| apidanalytics_collection_interval     | int. seconds. default: 120        |
+| apidanalytics_upload_interval         | int. seconds. default: 5          |
+| apidanalytics_uap_server_base         | string. url. required.            |
+| apidanalytics_use_caching             | boolean. default: true            |
+| apidanalytics_buffer_channel_size     | int. number of slots. default: 100|
+
+### Startup Procedure
+1. Initialize crash recovery, upload and buffering manager to handle buffering analytics messages to files
+   locally and then periodically upload these files to S3/GCS based on signedURL received from
+   uapCollectionEndpoint exposed via edgex proxy
+2. Create a listener for Apigee-Sync event
+    1. Each time a Snapshot is received, create an in-memory cache for data scope and developer information
+    2. Each time a changeList is received
+        1. if data_scope info changed, then insert/delete info for changed scope from tenantCache
+        2. if any other kms table is changed, then refresh entire developerInfo cache as only specific
+           fields are saved in the cache
+3. Initialize POST /analytics/{scope_uuid} API
+4. Upon receiving POST requests
+    1. Validate and enrich each batch of analytics records
+    2. If valid, then publish records to an internal buffer channel
+5. Buffering Logic
+    1. Buffering manager creates listener on the internal buffer channel and thus consumes messages
+       as soon as they are put on the channel
+    2. Based on the current timestamp either an existing directory is used to save these messages
+       or new a new timestamp directory is created
+    3. If a new directory is created, then an event will be published on the closeBucketEvent Channel
+       at the expected directory closing time
+    4. The messages are stored in a file under tmp/<timestamp_directory>
+    5. Based on collection interval, periodically the files in tmp are closed by the routine listening on the
+       closeBucketEvent channel and the directory is moved to staging directory
+6. Upload Manager
+    1. The upload manager periodically checks the staging directory to look for new folders
+    2. When a new folder arrives here, it means all files under that are closed and ready to uploaded
+    3. Tenant info is extracted from the directory name and the files are sequentially uploaded to S3/GCS
+    4. Based on the upload status
+        1. If upload is successful then directory is deleted from staging and previously failed uploads are retried
+        2. if upload fails, then upload is retried 3 times before moving the directory to failed directory
+7. Crash Recovery is a one time activity performed when the plugin is started to
+   cleanly handle open files from a previous Apid stop or crash event
+
+### Exposed API
+```sh
+POST /analytics/{bundle_scope_uuid}
+```
+Complete spec is listed in  `api.yaml`
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..03f5e31
--- /dev/null
+++ b/api.go
@@ -0,0 +1,67 @@
+package apidAnalytics
+
+import (
+	"github.com/30x/apid"
+	"net/http"
+	"strings"
+)
+
+var analyticsBasePath string
+
+type errResponse struct {
+	ErrorCode string `json:"errorCode"`
+	Reason    string `json:"reason"`
+}
+
+type dbError struct {
+	ErrorCode string `json:"errorCode"`
+	Reason    string `json:"reason"`
+}
+
+func initAPI(services apid.Services) {
+	log.Debug("initialized API's exposed by apidAnalytics plugin")
+	analyticsBasePath = config.GetString(configAnalyticsBasePath)
+	services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}",
+		saveAnalyticsRecord).Methods("POST")
+}
+
+func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
+
+	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+
+	db := getDB() // When database isnt initialized
+	if db == nil {
+		writeError(w, http.StatusInternalServerError,
+			"INTERNAL_SERVER_ERROR",
+			"Service is not initialized completely")
+		return
+	}
+
+	if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") {
+		writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE",
+			"Only supported content type is application/json")
+		return
+	}
+
+	vars := apid.API().Vars(r)
+	scopeuuid := vars["bundle_scope_uuid"]
+	tenant, dbErr := getTenantForScope(scopeuuid)
+	if dbErr.ErrorCode != "" {
+		switch dbErr.ErrorCode {
+		case "INTERNAL_SEARCH_ERROR":
+			writeError(w, http.StatusInternalServerError,
+				"INTERNAL_SEARCH_ERROR", dbErr.Reason)
+		case "UNKNOWN_SCOPE":
+			writeError(w, http.StatusBadRequest,
+				"UNKNOWN_SCOPE", dbErr.Reason)
+		}
+	} else {
+		err := processPayload(tenant, scopeuuid, r)
+		if err.ErrorCode == "" {
+			w.WriteHeader(http.StatusOK)
+		} else {
+			writeError(w, http.StatusBadRequest,
+				err.ErrorCode, err.Reason)
+		}
+	}
+}
diff --git a/api.yaml b/api.yaml
new file mode 100644
index 0000000..9b9ccfa
--- /dev/null
+++ b/api.yaml
@@ -0,0 +1,147 @@
+swagger: "2.0"
+info:
+  version: "v1"
+  title: APID analytics API
+host: playground.apistudio.io
+basePath: /try/64e409ad-aebb-4bbc-977e-f0e0f22209d4
+schemes:
+  - http
+  - https
+consumes:
+  - application/json
+produces:
+  - application/json
+paths:
+  '/analytics/{bundle_scope_uuid}':
+    x-swagger-router-controller: analytics
+    parameters:
+      - name: bundle_scope_uuid
+        in: path
+        required: true
+        description: bundle UUID that can be mapped to a scope by APID
+        type: string
+      - name: analytics_data
+        in: body
+        description: The analytics data you want to post
+        required: true
+        schema:
+          $ref: "#/definitions/records"
+    post:
+      responses:
+        "200":
+          description: Success
+        "400":
+          description: Bad Request
+          schema:
+            $ref: "#/definitions/errClientError"
+        "500":
+          description: Server error
+          schema:
+            $ref: "#/definitions/errServerError"
+        default:
+          description: Error
+          schema:
+            $ref: "#/definitions/errResponse"
+
+definitions:
+  records:
+    type: object
+    required:
+      - records
+    properties:
+      records:
+        type: array
+        minItems: 1
+        items:
+          $ref: "#/definitions/eachRecord"
+    example: {
+    "records":[{
+        "response_status_code": 400,
+        "client_received_start_timestamp": 1462850097576,
+        "client_id":"0GJKn7EQmNkKYcGL7x3gHaawWLs5gUPr"
+      },{
+        "response_status_code": 200,
+        "client_id":"2ngXgr6Rl2PXWiEmbt8zCkWY3Ptjb8ep",
+        "request_verb" : "GET",
+        "api_product":" test_product",
+        "access_token" : "fewGWG343LDV346345YCDS",
+        "apiproxy" : "OAuthProxy",
+        "apiproxy_revision" : "2",
+        "client_ip": "10.16.9.11",
+        "client_sent_end_timestamp": 1462850097894,
+        "client_received_start_timestamp": 1462850097576,
+      	"client_received_end_timestamp": 1462850097580,
+      	"client_sent_start_timestamp": 1462850097894,
+        "request_path" : "/oauth/oauthv2/auth_code/",
+        "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB",
+        "useragent" : "Chrome",
+        "target" : "target_name",
+        "target_received_end_timestamp": 1462850097800,
+        "target_received_start_timestamp": 1462850097800,
+        "target_response_code" : 200,
+        "target_sent_end_timestamp" : 1462850097802,
+        "target_sent_start_timestamp" :  1462850097802
+      }]
+    }
+
+  eachRecord:
+    description: Each record is basically a map of key value pair. client_received_start_timestamp is a required property but more fields can be added
+    type: object
+    required:
+      - client_received_start_timestamp
+    properties:
+      client_received_start_timestamp:
+        type: integer
+        format: int64
+    example: {
+      "response_status_code":400,
+      "client_received_start_timestamp":1462850097576,
+      "client_id":"0GJKn7EQmNkKYcGL7x3gHaawWLs5gUPr"
+    }
+
+  errClientError:
+    required:
+      - errrorCode
+      - reason
+    properties:
+      errrorCode:
+        type: string
+        enum:
+          - UNKNOWN_SCOPE
+          - BAD_DATA
+          - UNSUPPORTED_CONTENT_TYPE
+          - UNSUPPORTED_CONTENT_ENCODING
+          - MISSING_FIELD
+      reason:
+        type: string
+    example: {
+      "errrorCode":"UNKNOWN_SCOPE",
+      "reason":"No tenant found for this scopeuuid : UUID"
+    }
+
+  errServerError:
+    required:
+      - errrorCode
+      - reason
+    properties:
+      errrorCode:
+        type: string
+        enum:
+          - INTERNAL_SERVER_ERROR
+          - INTERNAL_SEARCH_ERROR
+      reason:
+        type: string
+    example: {
+      "errrorCode":"INTERNAL_SERVER_ERROR",
+      "reason":"Service is not initialized completely"
+    }
+
+  errResponse:
+    required:
+      - errrorCode
+      - reason
+    properties:
+      errrorCode:
+        type: string
+      reason:
+        type: string
\ No newline at end of file
diff --git a/api_helper.go b/api_helper.go
new file mode 100644
index 0000000..24dba22
--- /dev/null
+++ b/api_helper.go
@@ -0,0 +1,181 @@
+package apidAnalytics
+
+import (
+	"compress/gzip"
+	"encoding/json"
+	"io"
+	"net/http"
+	"strings"
+)
+
+/*
+Implements all the helper methods needed to process the POST /analytics payload
+and send it to the internal buffer channel
+*/
+
+type developerInfo struct {
+	ApiProduct     string
+	DeveloperApp   string
+	DeveloperEmail string
+	Developer      string
+}
+
+type axRecords struct {
+	Tenant tenant
+	// Records is an array of multiple analytics records
+	Records []interface{}
+}
+
+type tenant struct {
+	Org      string
+	Env      string
+	TenantId string
+}
+
+func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
+	var gzipEncoded bool
+	if r.Header.Get("Content-Encoding") != "" {
+		if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
+			return errResponse{
+				ErrorCode: "UNSUPPORTED_CONTENT_ENCODING",
+				Reason:    "Only supported content encoding is gzip"}
+		} else {
+			gzipEncoded = true
+		}
+	}
+
+	var reader io.ReadCloser
+	var err error
+	if gzipEncoded {
+		reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
+		if err != nil {
+			return errResponse{
+				ErrorCode: "BAD_DATA",
+				Reason:    "Gzip Encoded data cannot be read"}
+		}
+	} else {
+		reader = r.Body
+	}
+
+	errMessage := validateEnrichPublish(tenant, scopeuuid, reader)
+	if errMessage.ErrorCode != "" {
+		return errMessage
+	}
+	return errResponse{}
+}
+
+func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse {
+	var raw map[string]interface{}
+	decoder := json.NewDecoder(reader) // Decode payload to JSON data
+	decoder.UseNumber()
+
+	if err := decoder.Decode(&raw); err != nil {
+		return errResponse{ErrorCode: "BAD_DATA",
+			Reason: "Not a valid JSON payload"}
+	}
+
+	if records := raw["records"]; records != nil {
+		// Iterate through each record to validate and enrich it
+		for _, eachRecord := range records.([]interface{}) {
+			recordMap := eachRecord.(map[string]interface{})
+			valid, err := validate(recordMap)
+			if valid {
+				enrich(recordMap, scopeuuid, tenant)
+			} else {
+				// Even if there is one bad record, then reject entire batch
+				return err
+			}
+		}
+		axRecords := axRecords{
+			Tenant:  tenant,
+			Records: records.([]interface{})}
+		// publish batch of records to channel (blocking call)
+		internalBuffer <- axRecords
+	} else {
+		return errResponse{
+			ErrorCode: "NO_RECORDS",
+			Reason:    "No analytics records in the payload"}
+	}
+	return errResponse{}
+}
+
+/*
+Does basic validation on each analytics message
+1. client_received_start_timestamp should exist
+2. if client_received_end_timestamp exists then it should be > client_received_start_timestamp
+*/
+func validate(recordMap map[string]interface{}) (bool, errResponse) {
+	elems := []string{"client_received_start_timestamp"}
+	for _, elem := range elems {
+		if recordMap[elem] == nil {
+			return false, errResponse{
+				ErrorCode: "MISSING_FIELD",
+				Reason:    "Missing Required field: " + elem}
+		}
+	}
+
+	crst, exists1 := recordMap["client_received_start_timestamp"]
+	cret, exists2 := recordMap["client_received_end_timestamp"]
+	if exists1 && exists2 {
+		if crst.(json.Number) > cret.(json.Number) {
+			return false, errResponse{
+				ErrorCode: "BAD_DATA",
+				Reason: "client_received_start_timestamp " +
+					"> client_received_end_timestamp"}
+		}
+	}
+	return true, errResponse{}
+}
+
+/*
+Enrich each record by adding org and env fields
+It also finds add developer related information based on the apiKey
+*/
+func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
+	org, orgExists := recordMap["organization"]
+	if !orgExists || org.(string) == "" {
+		recordMap["organization"] = tenant.Org
+	}
+
+	env, envExists := recordMap["environment"]
+	if !envExists || env.(string) == "" {
+		recordMap["environment"] = tenant.Env
+	}
+
+	apiKey, exists := recordMap["client_id"]
+	// apiKey doesnt exist then ignore adding developer fields
+	if exists {
+		apiKey := apiKey.(string)
+		devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
+		_, exists := recordMap["api_product"]
+		if !exists {
+			recordMap["api_product"] = devInfo.ApiProduct
+		}
+		_, exists = recordMap["developer_app"]
+		if !exists {
+			recordMap["developer_app"] = devInfo.DeveloperApp
+		}
+		_, exists = recordMap["developer_email"]
+		if !exists {
+			recordMap["developer_email"] = devInfo.DeveloperEmail
+		}
+		_, exists = recordMap["developer"]
+		if !exists {
+			recordMap["developer"] = devInfo.Developer
+		}
+	}
+}
+
+func writeError(w http.ResponseWriter, status int, code string, reason string) {
+	w.WriteHeader(status)
+	e := errResponse{
+		ErrorCode: code,
+		Reason:    reason,
+	}
+	bytes, err := json.Marshal(e)
+	if err != nil {
+		log.Errorf("unable to marshal errorResponse: %v", err)
+	} else {
+		w.Write(bytes)
+	}
+}
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..d172a29
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,49 @@
+package apidAnalytics
+
+import (
+	"net/http"
+	"net/url"
+	"strings"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+// BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go
+var _ = Describe("testing saveAnalyticsRecord() directly", func() {
+	Context("valid scopeuuid", func() {
+		It("should successfully return", func() {
+			uri, err := url.Parse(testServer.URL)
+			uri.Path = analyticsBasePath
+
+			v := url.Values{}
+			v.Add("bundle_scope_uuid", "testid")
+
+			client := &http.Client{}
+			req, err := http.NewRequest("POST", uri.String(),
+				strings.NewReader(v.Encode()))
+			res, err := client.Do(req)
+			defer res.Body.Close()
+			Expect(err).ShouldNot(HaveOccurred())
+			Expect(res.StatusCode, http.StatusOK)
+		})
+	})
+
+	Context("invalid scopeuuid", func() {
+		It("should return bad request", func() {
+			uri, err := url.Parse(testServer.URL)
+			uri.Path = analyticsBasePath
+
+			v := url.Values{}
+			v.Add("bundle_scope_uuid", "wrongId")
+
+			client := &http.Client{}
+			req, err := http.NewRequest("POST", uri.String(),
+				strings.NewReader(v.Encode()))
+			res, err := client.Do(req)
+			defer res.Body.Close()
+			Expect(err).ShouldNot(HaveOccurred())
+			Expect(res.StatusCode, http.StatusBadRequest)
+		})
+	})
+})
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
new file mode 100644
index 0000000..1e5c128
--- /dev/null
+++ b/apidAnalytics_suite_test.go
@@ -0,0 +1,232 @@
+package apidAnalytics
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+
+	"github.com/30x/apid"
+	"github.com/30x/apid/factory"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"testing"
+)
+
+var (
+	testTempDir string
+	testServer  *httptest.Server
+)
+
+func TestApidAnalytics(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "ApidAnalytics Suite")
+}
+
+var _ = BeforeSuite(func() {
+	apid.Initialize(factory.DefaultServicesFactory())
+
+	config := apid.Config()
+
+	var err error
+	testTempDir, err = ioutil.TempDir("", "api_test")
+	Expect(err).NotTo(HaveOccurred())
+
+	config.Set("data_path", testTempDir)
+	config.Set(uapServerBase, "http://localhost:9000") // dummy value
+	config.Set("apigeesync_apid_instance_id",
+		"abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
+	config.Set(useCaching, true)
+
+	db, err := apid.Data().DB()
+	Expect(err).NotTo(HaveOccurred())
+	setDB(db)
+	createApidClusterTables(db)
+	createTables(db)
+	insertTestData(db)
+	apid.InitializePlugins()
+
+	// Create cache else its created in listener.go when a snapshot is received
+	createTenantCache()
+	createDeveloperInfoCache()
+
+	testServer = httptest.NewServer(http.HandlerFunc(
+		func(w http.ResponseWriter, req *http.Request) {
+			if req.URL.Path == analyticsBasePathDefault {
+				saveAnalyticsRecord(w, req)
+			}
+		}))
+})
+
+func createTables(db apid.DB) {
+	_, err := db.Exec(`
+		CREATE TABLE IF NOT EXISTS api_product (
+		    id text,
+		    tenant_id text,
+		    name text,
+		    display_name text,
+		    description text,
+		    api_resources text[],
+		    approval_type text,
+		    _change_selector text,
+		    proxies text[],
+		    environments text[],
+		    quota text,
+		    quota_time_unit text,
+		    quota_interval int,
+		    created_at int64,
+		    created_by text,
+		    updated_at int64,
+		    updated_by text,
+		    PRIMARY KEY (tenant_id, id));
+		CREATE TABLE IF NOT EXISTS developer (
+		    id text,
+		    tenant_id text,
+		    username text,
+		    first_name text,
+		    last_name text,
+		    password text,
+		    email text,
+		    status text,
+		    encrypted_password text,
+		    salt text,
+		    _change_selector text,
+		    created_at int64,
+		    created_by text,
+		    updated_at int64,
+		    updated_by text,
+		    PRIMARY KEY (tenant_id, id)
+		);
+		CREATE TABLE IF NOT EXISTS app (
+		    id text,
+		    tenant_id text,
+		    name text,
+		    display_name text,
+		    access_type text,
+		    callback_url text,
+		    status text,
+		    app_family text,
+		    company_id text,
+		    developer_id text,
+		    type int,
+		    created_at int64,
+		    created_by text,
+		    updated_at int64,
+		    updated_by text,
+		    _change_selector text,
+		    PRIMARY KEY (tenant_id, id)
+		);
+		CREATE TABLE IF NOT EXISTS app_credential_apiproduct_mapper (
+		    tenant_id text,
+		    appcred_id text,
+		    app_id text,
+		    apiprdt_id text,
+		    _change_selector text,
+		    status text,
+		    PRIMARY KEY (appcred_id, app_id, apiprdt_id,tenant_id)
+		);
+	`)
+	if err != nil {
+		panic("Unable to initialize DB " + err.Error())
+	}
+}
+
+func createApidClusterTables(db apid.DB) {
+	_, err := db.Exec(`
+		CREATE TABLE apid_cluster (
+		    id text,
+		    instance_id text,
+		    name text,
+		    description text,
+		    umbrella_org_app_name text,
+		    created int64,
+		    created_by text,
+		    updated int64,
+		    updated_by text,
+		    _change_selector text,
+		    snapshotInfo text,
+		    lastSequence text,
+		    PRIMARY KEY (id)
+		);
+		CREATE TABLE data_scope (
+		    id text,
+		    apid_cluster_id text,
+		    scope text,
+		    org text,
+		    env text,
+		    created int64,
+		    created_by text,
+		    updated int64,
+		    updated_by text,
+		    _change_selector text,
+		    PRIMARY KEY (id)
+		);
+	`)
+	if err != nil {
+		panic("Unable to initialize DB " + err.Error())
+	}
+}
+
+func insertTestData(db apid.DB) {
+
+	txn, err := db.Begin()
+	Expect(err).ShouldNot(HaveOccurred())
+	txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id,"+
+		" appcred_id, app_id, apiprdt_id, status, _change_selector) "+
+		"VALUES"+
+		"($1,$2,$3,$4,$5,$6)",
+		"tenantid",
+		"testapikey",
+		"testappid",
+		"testproductid",
+		"APPROVED",
+		"12345",
+	)
+
+	txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+
+		"VALUES"+
+		"($1,$2,$3,$4)",
+		"testappid",
+		"tenantid",
+		"testapp",
+		"testdeveloperid",
+	)
+
+	txn.Exec("INSERT INTO API_PRODUCT (id, tenant_id, name) "+
+		"VALUES"+
+		"($1,$2,$3)",
+		"testproductid",
+		"tenantid",
+		"testproduct",
+	)
+
+	txn.Exec("INSERT INTO DEVELOPER (id, tenant_id, username, email) "+
+		"VALUES"+
+		"($1,$2,$3,$4)",
+		"testdeveloperid",
+		"tenantid",
+		"testdeveloper",
+		"testdeveloper@test.com",
+	)
+
+	txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, "+
+		"apid_cluster_id, scope, org, env) "+
+		"VALUES"+
+		"($1,$2,$3,$4,$5,$6)",
+		"testid",
+		"some_change_selector",
+		"some_cluster_id",
+		"tenantid",
+		"testorg",
+		"testenv",
+	)
+	txn.Commit()
+}
+
+var _ = AfterSuite(func() {
+	apid.Events().Close()
+	if testServer != nil {
+		testServer.Close()
+	}
+	os.RemoveAll(testTempDir)
+})
diff --git a/buffering_manager.go b/buffering_manager.go
new file mode 100644
index 0000000..7e28d4b
--- /dev/null
+++ b/buffering_manager.go
@@ -0,0 +1,179 @@
+package apidAnalytics
+
+import (
+	"bufio"
+	"compress/gzip"
+	"crypto/rand"
+	"encoding/json"
+	"fmt"
+	"os"
+	"path/filepath"
+	"time"
+)
+
+const fileExtension = ".txt.gz"
+
+// Channel where analytics records are buffered before being dumped to a
+// file as write to file should not performed in the Http Thread
+var internalBuffer chan axRecords
+
+// Channel where close bucket event is published i.e. when a bucket
+// is ready to be closed based on collection interval
+var closeBucketEvent chan bucket
+
+// Map from timestampt to bucket
+var bucketMap map[int64]bucket
+
+type bucket struct {
+	DirName string
+	// We need file handle and writer to close the file
+	FileWriter fileWriter
+}
+
+// This struct will store open file handle and writer to close the file
+type fileWriter struct {
+	file *os.File
+	gw   *gzip.Writer
+	bw   *bufio.Writer
+}
+
+func initBufferingManager() {
+	internalBuffer = make(chan axRecords,
+		config.GetInt(analyticsBufferChannelSize))
+	closeBucketEvent = make(chan bucket)
+	bucketMap = make(map[int64]bucket)
+
+	// Keep polling the internal buffer for new messages
+	go func() {
+		for {
+			records := <-internalBuffer
+			err := save(records)
+			if err != nil {
+				log.Errorf("Could not save %d messages to file"+
+					" due to: %v", len(records.Records), err)
+			}
+		}
+	}()
+
+	// Keep polling the closeEvent channel to see if bucket is ready to be closed
+	go func() {
+		for {
+			bucket := <-closeBucketEvent
+			log.Debugf("Close Event received for bucket: %s",
+				bucket.DirName)
+
+			// close open file
+			closeGzipFile(bucket.FileWriter)
+
+			dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+			stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+			// close files in tmp folder and move directory to
+			// staging to indicate its ready for upload
+			err := os.Rename(dirToBeClosed, stagingPath)
+			if err != nil {
+				log.Errorf("Cannot move directory '%s' from"+
+					" tmp to staging folder", bucket.DirName)
+			}
+		}
+	}()
+}
+
+// Save records to correct file based on what timestamp data is being collected for
+func save(records axRecords) error {
+	bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
+	if err != nil {
+		return err
+	}
+	writeGzipFile(bucket.FileWriter, records.Records)
+	return nil
+}
+
+func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
+	// first based on current timestamp and collection interval,
+	// determine the timestamp of the bucket
+	ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
+	_, exists := bucketMap[ts]
+	if exists {
+		return bucketMap[ts], nil
+	} else {
+		timestamp := time.Unix(ts, 0).Format(timestampLayout)
+
+		// endtimestamp of bucket = starttimestamp + collectionInterval
+		endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
+		endtimestamp := endTime.Format(timestampLayout)
+
+		dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
+		newPath := filepath.Join(localAnalyticsTempDir, dirName)
+		// create dir
+		err := os.Mkdir(newPath, os.ModePerm)
+		if err != nil {
+			return bucket{}, fmt.Errorf("Cannot create directory "+
+				"'%s' to buffer messages '%v'", dirName, err)
+		}
+
+		// create file for writing
+		// Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz
+		fileName := getRandomHex() + "_" + timestamp + "." +
+			endtimestamp + "_" +
+			config.GetString("apigeesync_apid_instance_id") +
+			"_writer_0" + fileExtension
+		completeFilePath := filepath.Join(newPath, fileName)
+		fw, err := createGzipFile(completeFilePath)
+		if err != nil {
+			return bucket{}, err
+		}
+
+		newBucket := bucket{DirName: dirName, FileWriter: fw}
+		bucketMap[ts] = newBucket
+
+		//Send event to close directory after endTime + 5
+		// seconds to make sure all buffers are flushed to file
+		timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
+		go func() {
+			<-timer
+			closeBucketEvent <- newBucket
+		}()
+		return newBucket, nil
+	}
+}
+
+// 4 digit Hex is prefixed to each filename to improve
+// how s3 partitions the files being uploaded
+func getRandomHex() string {
+	buff := make([]byte, 2)
+	rand.Read(buff)
+	return fmt.Sprintf("%x", buff)
+}
+
+func createGzipFile(s string) (fileWriter, error) {
+	file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
+	if err != nil {
+		return fileWriter{},
+			fmt.Errorf("Cannot create file '%s' "+
+				"to buffer messages '%v'", s, err)
+	}
+	gw := gzip.NewWriter(file)
+	bw := bufio.NewWriter(gw)
+	return fileWriter{file, gw, bw}, nil
+}
+
+func writeGzipFile(fw fileWriter, records []interface{}) {
+	// write each record as a new line to the bufferedWriter
+	for _, eachRecord := range records {
+		s, _ := json.Marshal(eachRecord)
+		_, err := (fw.bw).WriteString(string(s))
+		if err != nil {
+			log.Errorf("Write to file failed '%v'", err)
+		}
+		(fw.bw).WriteString("\n")
+	}
+	// Flush entire batch of records to file vs each message
+	fw.bw.Flush()
+	fw.gw.Flush()
+}
+
+func closeGzipFile(fw fileWriter) {
+	fw.bw.Flush()
+	fw.gw.Close()
+	fw.file.Close()
+}
diff --git a/common_helper.go b/common_helper.go
new file mode 100644
index 0000000..9acc033
--- /dev/null
+++ b/common_helper.go
@@ -0,0 +1,215 @@
+package apidAnalytics
+
+import (
+	"database/sql"
+	"fmt"
+	"sync"
+)
+
+// Cache for scope uuid to org, env and tenantId information
+var tenantCache map[string]tenant
+
+// RW lock for tenant map cache since the cache can be
+// read while its being written to and vice versa
+var tenantCachelock = sync.RWMutex{}
+
+// Cache for apiKey~tenantId to developer related information
+var developerInfoCache map[string]developerInfo
+
+// RW lock for developerInfo map cache since the cache can be
+// read while its being written to and vice versa
+var developerInfoCacheLock = sync.RWMutex{}
+
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
+func createTenantCache() error {
+	// Lock before writing to the map as it has multiple readers
+	tenantCachelock.Lock()
+	defer tenantCachelock.Unlock()
+	tenantCache = make(map[string]tenant)
+	var org, env, tenantId, id string
+
+	db := getDB()
+	rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE")
+
+	if error != nil {
+		return fmt.Errorf("Count not get datascope from "+
+			"DB due to: %v", error)
+	} else {
+		defer rows.Close()
+		for rows.Next() {
+			rows.Scan(&env, &org, &tenantId, &id)
+			tenantCache[id] = tenant{Org: org,
+				Env:      env,
+				TenantId: tenantId}
+		}
+	}
+
+	log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
+	return nil
+}
+
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
+func createDeveloperInfoCache() error {
+	// Lock before writing to the map as it has multiple readers
+	developerInfoCacheLock.Lock()
+	defer developerInfoCacheLock.Unlock()
+	developerInfoCache = make(map[string]developerInfo)
+	var apiProduct, developerApp, developerEmail, developer sql.NullString
+	var tenantId, apiKey string
+
+	db := getDB()
+	sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name," +
+		" a.name, d.username, d.email " +
+		"FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
+		"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
+		"INNER JOIN APP AS a ON a.id = mp.app_id " +
+		"INNER JOIN DEVELOPER as d ON d.id = a.developer_id;"
+	rows, error := db.Query(sSql)
+
+	if error != nil {
+		return fmt.Errorf("Count not get developerInfo "+
+			"from DB due to: %v", error)
+	} else {
+		defer rows.Close()
+		for rows.Next() {
+			rows.Scan(&tenantId, &apiKey, &apiProduct,
+				&developerApp, &developer, &developerEmail)
+
+			keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
+			apiPrd := getValuesIgnoringNull(apiProduct)
+			devApp := getValuesIgnoringNull(developerApp)
+			dev := getValuesIgnoringNull(developer)
+			devEmail := getValuesIgnoringNull(developerEmail)
+
+			developerInfoCache[keyForMap] = developerInfo{
+				ApiProduct:     apiPrd,
+				DeveloperApp:   devApp,
+				DeveloperEmail: devEmail,
+				Developer:      dev}
+		}
+	}
+
+	log.Debugf("Count of apiKey~tenantId combinations "+
+		"in the cache: %d", len(developerInfoCache))
+	return nil
+}
+
+// Returns Tenant Info given a scope uuid from the cache or by querying
+// the DB directly based on useCachig config
+func getTenantForScope(scopeuuid string) (tenant, dbError) {
+	if config.GetBool(useCaching) {
+		// acquire a read lock as this cache has 1 writer as well
+		tenantCachelock.RLock()
+		defer tenantCachelock.RUnlock()
+		ten, exists := tenantCache[scopeuuid]
+		if !exists {
+			reason := "No tenant found for this scopeuuid: " + scopeuuid
+			errorCode := "UNKNOWN_SCOPE"
+			// Incase of unknown scope, try to refresh the
+			// cache ansynchronously incase an update was missed or delayed
+			go createTenantCache()
+			return tenant{}, dbError{
+				ErrorCode: errorCode,
+				Reason:    reason}
+		} else {
+			return ten, dbError{}
+		}
+	} else {
+		var org, env, tenantId string
+
+		db := getDB()
+		error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+
+			" where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
+
+		switch {
+		case error == sql.ErrNoRows:
+			reason := "No tenant found for this scopeuuid: " + scopeuuid
+			errorCode := "UNKNOWN_SCOPE"
+			return tenant{}, dbError{
+				ErrorCode: errorCode,
+				Reason:    reason}
+		case error != nil:
+			reason := error.Error()
+			errorCode := "INTERNAL_SEARCH_ERROR"
+			return tenant{}, dbError{
+				ErrorCode: errorCode,
+				Reason:    reason}
+		}
+		return tenant{
+			Org:      org,
+			Env:      env,
+			TenantId: tenantId}, dbError{}
+	}
+}
+
+// Returns Dveloper related info given an apiKey and tenantId
+// from the cache or by querying the DB directly based on useCachig config
+func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
+	if config.GetBool(useCaching) {
+		keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
+		// acquire a read lock as this cache has 1 writer as well
+		tenantCachelock.RLock()
+		defer tenantCachelock.RUnlock()
+		devInfo, exists := developerInfoCache[keyForMap]
+		if !exists {
+			log.Warnf("No data found for for tenantId = %s"+
+				" and apiKey = %s", tenantId, apiKey)
+			// Incase of unknown apiKey~tenantId,
+			// try to refresh the cache ansynchronously incase an update was missed or delayed
+			go createTenantCache()
+			return developerInfo{}
+		} else {
+			return devInfo
+		}
+	} else {
+		var apiProduct, developerApp, developerEmail sql.NullString
+		var developer sql.NullString
+
+		db := getDB()
+		sSql := "SELECT ap.name, a.name, d.username, d.email " +
+			"FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
+			"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
+			"INNER JOIN APP AS a ON a.id = mp.app_id " +
+			"INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
+			"where mp.tenant_id = ? and mp.appcred_id = ?;"
+		error := db.QueryRow(sSql, tenantId, apiKey).
+			Scan(&apiProduct, &developerApp,
+				&developer, &developerEmail)
+
+		switch {
+		case error == sql.ErrNoRows:
+			log.Debugf("No data found for for tenantId = %s "+
+				"and apiKey = %s", tenantId, apiKey)
+			return developerInfo{}
+		case error != nil:
+			log.Debugf("No data found for for tenantId = %s and "+
+				"apiKey = %s due to: %v", tenantId, apiKey, error)
+			return developerInfo{}
+		}
+
+		apiPrd := getValuesIgnoringNull(apiProduct)
+		devApp := getValuesIgnoringNull(developerApp)
+		dev := getValuesIgnoringNull(developer)
+		devEmail := getValuesIgnoringNull(developerEmail)
+		return developerInfo{ApiProduct: apiPrd,
+			DeveloperApp:   devApp,
+			DeveloperEmail: devEmail,
+			Developer:      dev}
+	}
+}
+
+// Helper method to handle scanning null values in DB to empty string
+func getValuesIgnoringNull(sqlValue sql.NullString) string {
+	if sqlValue.Valid {
+		return sqlValue.String
+	} else {
+		return ""
+	}
+}
+
+// Build Key as a combination of tenantId and apiKey for the developerInfo Cache
+func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string {
+	return tenantId + "~" + apiKey
+}
diff --git a/common_helper_test.go b/common_helper_test.go
new file mode 100644
index 0000000..c691d52
--- /dev/null
+++ b/common_helper_test.go
@@ -0,0 +1,48 @@
+package apidAnalytics
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+var _ = Describe("test getTenantForScope()", func() {
+	Context("get tenant for valid scopeuuid", func() {
+		It("should return testorg and testenv", func() {
+			tenant, dbError := getTenantForScope("testid")
+			Expect(dbError.Reason).To(Equal(""))
+			Expect(tenant.Org).To(Equal("testorg"))
+			Expect(tenant.Env).To(Equal("testenv"))
+			Expect(tenant.TenantId).To(Equal("tenantid"))
+		})
+	})
+
+	Context("get tenant for invalid scopeuuid", func() {
+		It("should return empty tenant and a db error", func() {
+			tenant, dbError := getTenantForScope("wrongid")
+			Expect(tenant.Org).To(Equal(""))
+			Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE"))
+		})
+	})
+})
+
+var _ = Describe("test getDeveloperInfo()", func() {
+	Context("get developerInfo for valid tenantId and apikey", func() {
+		It("should return all right data", func() {
+			developerInfo := getDeveloperInfo("tenantid", "testapikey")
+			Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
+			Expect(developerInfo.Developer).To(Equal("testdeveloper"))
+			Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com"))
+			Expect(developerInfo.DeveloperApp).To(Equal("testapp"))
+		})
+	})
+
+	Context("get developerInfo for invalid tenantId and apikey", func() {
+		It("should return all right data", func() {
+			developerInfo := getDeveloperInfo("wrongid", "wrongapikey")
+			Expect(developerInfo.ApiProduct).To(Equal(""))
+			Expect(developerInfo.Developer).To(Equal(""))
+			Expect(developerInfo.DeveloperEmail).To(Equal(""))
+			Expect(developerInfo.DeveloperApp).To(Equal(""))
+		})
+	})
+})
diff --git a/crash_recovery.go b/crash_recovery.go
new file mode 100644
index 0000000..ccd3b36
--- /dev/null
+++ b/crash_recovery.go
@@ -0,0 +1,189 @@
+package apidAnalytics
+
+import (
+	"bufio"
+	"compress/gzip"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strings"
+	"time"
+)
+
+const (
+	crashRecoveryDelay = 30 // seconds
+	// same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
+	recoveryTSLayout = "20060102150405.000"
+	// Constant to identify recovered files
+	recoveredTS = "~recoveredTS~"
+)
+
+func initCrashRecovery() {
+	if crashRecoveryNeeded() {
+		timer := time.After(time.Second * crashRecoveryDelay)
+		// Actual recovery of files is attempted asynchronously
+		// after a delay to not block the apid plugin from starting up
+		go func() {
+			<-timer
+			performRecovery()
+		}()
+	}
+}
+
+// Crash recovery is needed if there are any folders in tmp or recovered directory
+func crashRecoveryNeeded() bool {
+	recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
+	tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
+	needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
+	if needed {
+		log.Infof("Crash Recovery is needed and will be "+
+			"attempted in %d seconds", crashRecoveryDelay)
+	}
+	return needed
+}
+
+// If Apid is shutdown or crashes while a file is still open in tmp folder,
+// then the file has partial data.
+// This partial data can be recoverd.
+func recoverFoldersInTmpDir() bool {
+	tmpRecoveryNeeded := false
+	dirs, _ := ioutil.ReadDir(localAnalyticsTempDir)
+	recoveryTS := getRecoveryTS()
+	for _, dir := range dirs {
+		tmpRecoveryNeeded = true
+		log.Debugf("Moving directory '%s' from tmp to"+
+			" recovered folder", dir.Name())
+		tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
+		// Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+		newDirName := dir.Name() + recoveredTS + recoveryTS
+		recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName)
+		err := os.Rename(tmpCompletePath, recoveredCompletePath)
+		if err != nil {
+			log.Errorf("Cannot move directory '%s' "+
+				"from tmp to recovered folder", dir.Name())
+		}
+	}
+	return tmpRecoveryNeeded
+}
+
+// Get Timestamp for when the recovery is being attempted on the folder.
+func getRecoveryTS() string {
+	current := time.Now()
+	return current.Format(recoveryTSLayout)
+}
+
+// If APID is restarted twice immediately such that files have been moved to
+// recovered folder but actual recovery has'nt started or is partially done
+// Then the files will just stay in the recovered dir
+// and need to be recovered again.
+func recoverFolderInRecoveredDir() bool {
+	dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+	if len(dirs) > 0 {
+		return true
+	}
+	return false
+}
+
+func performRecovery() {
+	log.Info("Crash recovery is starting...")
+	recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+	for _, dir := range recoveryDirs {
+		recoverDirectory(dir.Name())
+	}
+	log.Info("Crash recovery complete...")
+}
+
+func recoverDirectory(dirName string) {
+	log.Infof("performing crash recovery for directory: %s", dirName)
+	var bucketRecoveryTS string
+
+	// Parse bucket name to extract recoveryTS and pass it each file to be recovered
+	// Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+	// 			-> bucketRecoveryTS = _20160101222612.123
+	index := strings.Index(dirName, recoveredTS)
+	if index != -1 {
+		bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
+	}
+
+	dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
+	files, _ := ioutil.ReadDir(dirBeingRecovered)
+	for _, file := range files {
+		// recovering each file sequentially for now
+		recoverFile(bucketRecoveryTS, dirName, file.Name())
+	}
+
+	stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+	err := os.Rename(dirBeingRecovered, stagingPath)
+	if err != nil {
+		log.Errorf("Cannot move directory '%s' from"+
+			" recovered to staging folder", dirName)
+	}
+}
+
+func recoverFile(bucketRecoveryTS, dirName, fileName string) {
+	log.Debugf("performing crash recovery for file: %s ", fileName)
+	// add recovery timestamp to the file name
+	completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
+
+	recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
+	recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
+	// eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt
+	recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
+
+	// Copy complete records to new file and delete original partial file
+	copyPartialFile(completeOrigFilePath, recoveredFilePath)
+	deletePartialFile(completeOrigFilePath)
+}
+
+// The file is read line by line and all complete records are extracted
+// and copied to a new file which is closed as a correct gzip file.
+func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
+	partialFile, err := os.Open(completeOrigFilePath)
+	if err != nil {
+		log.Errorf("Cannot open file: %s", completeOrigFilePath)
+		return
+	}
+	defer partialFile.Close()
+
+	bufReader := bufio.NewReader(partialFile)
+	gzReader, err := gzip.NewReader(bufReader)
+	if err != nil {
+		log.Errorf("Cannot create reader on gzip file: %s"+
+			" due to %v", completeOrigFilePath, err)
+		return
+	}
+	defer gzReader.Close()
+
+	scanner := bufio.NewScanner(gzReader)
+
+	// Create new file to copy complete records from partial file and upload only a complete file
+	recoveredFile, err := os.OpenFile(recoveredFilePath,
+		os.O_WRONLY|os.O_CREATE, os.ModePerm)
+	if err != nil {
+		log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
+		return
+	}
+	defer recoveredFile.Close()
+
+	gzWriter := gzip.NewWriter(recoveredFile)
+	defer gzWriter.Close()
+
+	bufWriter := bufio.NewWriter(gzWriter)
+	defer bufWriter.Flush()
+
+	for scanner.Scan() {
+		bufWriter.Write(scanner.Bytes())
+		bufWriter.WriteString("\n")
+	}
+	if err := scanner.Err(); err != nil {
+		log.Warnf("Error while scanning partial file: %v", err)
+		return
+	}
+}
+
+func deletePartialFile(completeOrigFilePath string) {
+	err := os.Remove(completeOrigFilePath)
+	if err != nil {
+		log.Errorf("Cannot delete partial file: %s", completeOrigFilePath)
+	}
+}
diff --git a/glide.yaml b/glide.yaml
new file mode 100644
index 0000000..054e3eb
--- /dev/null
+++ b/glide.yaml
@@ -0,0 +1,8 @@
+package: github.com/30x/apidAnalytics
+
+import:
+- package: github.com/30x/apid
+  version: master
+testImport:
+- package: github.com/onsi/ginkgo/ginkgo
+- package: github.com/onsi/gomega
\ No newline at end of file
diff --git a/init.go b/init.go
new file mode 100644
index 0000000..a3501a4
--- /dev/null
+++ b/init.go
@@ -0,0 +1,178 @@
+package apidAnalytics
+
+import (
+	"fmt"
+	"github.com/30x/apid"
+	"os"
+	"path/filepath"
+	"sync"
+)
+
+const (
+	// Base path of analytics API that will be exposed
+	configAnalyticsBasePath  = "apidanalytics_base_path"
+	analyticsBasePathDefault = "/analytics"
+
+	// Root directory for analytics local data buffering
+	configAnalyticsDataPath  = "apidanalytics_data_path"
+	analyticsDataPathDefault = "/ax"
+
+	// Data collection and buffering interval in seconds
+	analyticsCollectionInterval        = "apidanalytics_collection_interval"
+	analyticsCollectionIntervalDefault = "120"
+
+	// Interval in seconds based on which staging directory
+	// will be checked for folders ready to be uploaded
+	analyticsUploadInterval        = "apidanalytics_upload_interval"
+	analyticsUploadIntervalDefault = "5"
+
+	// Number of slots for internal channel buffering of
+	// analytics records before they are dumped to a file
+	analyticsBufferChannelSize        = "apidanalytics_buffer_channel_size"
+	analyticsBufferChannelSizeDefault = 100
+
+	// EdgeX endpoint base path to access Uap Collection Endpoint
+	uapServerBase = "apidanalytics_uap_server_base"
+
+	// If caching is used then data scope and developer
+	// info will be maintained in-memory
+	// cache to avoid DB calls for each analytics message
+	useCaching        = "apidanalytics_use_caching"
+	useCachingDefault = true
+)
+
+// keep track of the services that this plugin will use
+var (
+	log      apid.LogService
+	config   apid.ConfigService
+	data     apid.DataService
+	events   apid.EventsService
+	unsafeDB apid.DB
+	dbMux    sync.RWMutex
+
+	localAnalyticsBaseDir      string
+	localAnalyticsTempDir      string
+	localAnalyticsStagingDir   string
+	localAnalyticsFailedDir    string
+	localAnalyticsRecoveredDir string
+)
+
+// apid.RegisterPlugin() is required to be called in init()
+func init() {
+	apid.RegisterPlugin(initPlugin)
+}
+
+func getDB() apid.DB {
+	dbMux.RLock()
+	db := unsafeDB
+	dbMux.RUnlock()
+	return db
+}
+
+func setDB(db apid.DB) {
+	dbMux.Lock()
+	unsafeDB = db
+	dbMux.Unlock()
+}
+
+// initPlugin will be called by apid to initialize
+func initPlugin(services apid.Services) (apid.PluginData, error) {
+	// set a logger that is annotated for this plugin
+	log = services.Log().ForModule("apidAnalytics")
+	log.Debug("start init for apidAnalytics plugin")
+
+	data = services.Data()
+	events = services.Events()
+	events.Listen("ApigeeSync", &handler{})
+
+	// set configuration
+	err := setConfig(services)
+	if err != nil {
+		return pluginData, err
+	}
+
+	for _, key := range []string{uapServerBase} {
+		if !config.IsSet(key) {
+			return pluginData,
+				fmt.Errorf("Missing required config value: %s", key)
+		}
+	}
+
+	// Create directories for managing buffering and upload to UAP stages
+	directories := []string{localAnalyticsBaseDir,
+		localAnalyticsTempDir,
+		localAnalyticsStagingDir,
+		localAnalyticsFailedDir,
+		localAnalyticsRecoveredDir}
+	err = createDirectories(directories)
+
+	if err != nil {
+		return pluginData, fmt.Errorf("Cannot create "+
+			"required local directories: %v ", err)
+	}
+
+	// Initialize one time crash recovery to be performed by the plugin on start up
+	initCrashRecovery()
+
+	// Initialize upload manager to watch the staging directory and
+	// upload files to UAP as they are ready
+	initUploadManager()
+
+	// Initialize buffer manager to watch the internalBuffer channel
+	// for new messages and dump them to files
+	initBufferingManager()
+
+	// Initialize API's and expose them
+	initAPI(services)
+	log.Debug("end init for apidAnalytics plugin")
+	return pluginData, nil
+}
+
+func setConfig(services apid.Services) error {
+	config = services.Config()
+
+	// set plugin config defaults
+	config.SetDefault(configAnalyticsBasePath, analyticsBasePathDefault)
+	config.SetDefault(configAnalyticsDataPath, analyticsDataPathDefault)
+
+	if !config.IsSet("local_storage_path") {
+		return fmt.Errorf("Missing required config value: local_storage_path")
+	}
+
+	// set local directory paths that will be used to buffer analytics data on disk
+	localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"),
+		config.GetString(configAnalyticsDataPath))
+	localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp")
+	localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging")
+	localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed")
+	localAnalyticsRecoveredDir = filepath.Join(localAnalyticsBaseDir, "recovered")
+
+	// set default config for collection interval
+	config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault)
+
+	// set default config for useCaching
+	config.SetDefault(useCaching, useCachingDefault)
+
+	// set default config for upload interval
+	config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
+
+	// set default config for internal buffer size
+	config.SetDefault(analyticsBufferChannelSize, analyticsBufferChannelSizeDefault)
+
+	return nil
+}
+
+// create all missing directories if required
+func createDirectories(directories []string) error {
+	for _, path := range directories {
+		if _, err := os.Stat(path); os.IsNotExist(err) {
+			error := os.Mkdir(path, os.ModePerm)
+			if error != nil {
+				return error
+			}
+			log.Infof("created directory for analytics "+
+				"data collection: %s", path)
+		}
+	}
+	return nil
+}
diff --git a/listener.go b/listener.go
new file mode 100644
index 0000000..4c57da8
--- /dev/null
+++ b/listener.go
@@ -0,0 +1,120 @@
+package apidAnalytics
+
+import (
+	"github.com/30x/apid"
+	"github.com/apigee-labs/transicator/common"
+)
+
+type handler struct{}
+
+func (h *handler) String() string {
+	return "apigeeAnalytics"
+}
+
+func (h *handler) Handle(e apid.Event) {
+	snapData, ok := e.(*common.Snapshot)
+	if ok {
+		processSnapshot(snapData)
+	} else {
+		changeSet, ok := e.(*common.ChangeList)
+		if ok {
+			processChange(changeSet)
+		} else {
+			log.Errorf("Received Invalid event. Ignoring. %v", e)
+		}
+	}
+	return
+}
+
+func processSnapshot(snapshot *common.Snapshot) {
+	log.Debugf("Snapshot received. Switching to"+
+		" DB version: %s", snapshot.SnapshotInfo)
+
+	db, err := data.DBVersion(snapshot.SnapshotInfo)
+	if err != nil {
+		log.Panicf("Unable to access database: %v", err)
+	}
+	setDB(db)
+
+	if config.GetBool(useCaching) {
+		err = createTenantCache()
+		if err != nil {
+			log.Error(err)
+		} else {
+			log.Debug("Created a local cache" +
+				" for datasope information")
+		}
+		err = createDeveloperInfoCache()
+		if err != nil {
+			log.Error(err)
+		} else {
+			log.Debug("Created a local cache for developer information")
+		}
+	} else {
+		log.Info("Will not be caching any developer info " +
+			"and make a DB call for every analytics msg")
+	}
+	return
+}
+
+func processChange(changes *common.ChangeList) {
+	if config.GetBool(useCaching) {
+		log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
+		var rows []common.Row
+
+		refreshDevInfoNeeded := false
+		for _, payload := range changes.Changes {
+			rows = nil
+			switch payload.Table {
+			case "edgex.data_scope":
+				switch payload.Operation {
+				case common.Insert, common.Update:
+					rows = append(rows, payload.NewRow)
+					// Lock before writing to the
+					// map as it has multiple readers
+					tenantCachelock.Lock()
+					defer tenantCachelock.Unlock()
+					for _, ele := range rows {
+						var scopeuuid, tenantid string
+						var org, env string
+						ele.Get("id", &scopeuuid)
+						ele.Get("scope", &tenantid)
+						ele.Get("org", &org)
+						ele.Get("env", &env)
+						tenantCache[scopeuuid] = tenant{
+							Org:      org,
+							Env:      env,
+							TenantId: tenantid}
+						log.Debugf("Refreshed local "+
+							"tenantCache. Added "+
+							"scope: "+"%s", scopeuuid)
+					}
+				case common.Delete:
+					rows = append(rows, payload.OldRow)
+					// Lock before writing to the map
+					// as it has multiple readers
+					tenantCachelock.Lock()
+					defer tenantCachelock.Unlock()
+					for _, ele := range rows {
+						var scopeuuid string
+						ele.Get("id", &scopeuuid)
+						delete(tenantCache, scopeuuid)
+						log.Debugf("Refreshed local"+
+							" tenantCache. Deleted"+
+							" scope: %s", scopeuuid)
+					}
+				}
+			case "kms.developer", "kms.app", "kms.api_product",
+				"kms.app_credential_apiproduct_mapper":
+				// any change in any of the above tables
+				// should result in cache refresh
+				refreshDevInfoNeeded = true
+			}
+		}
+		// Refresh cache once for all set of changes
+		if refreshDevInfoNeeded {
+			createDeveloperInfoCache()
+			log.Debug("Refresh local developerInfoCache")
+		}
+	}
+}
diff --git a/pluginData.go b/pluginData.go
new file mode 100644
index 0000000..af0faa6
--- /dev/null
+++ b/pluginData.go
@@ -0,0 +1,11 @@
+package apidAnalytics
+
+import "github.com/30x/apid"
+
+var pluginData = apid.PluginData{
+	Name:    "apidAnalytics",
+	Version: "0.0.1",
+	ExtraData: map[string]interface{}{
+		"schemaVersion": "0.0.1",
+	},
+}
diff --git a/upload_manager.go b/upload_manager.go
new file mode 100644
index 0000000..a95ce87
--- /dev/null
+++ b/upload_manager.go
@@ -0,0 +1,111 @@
+package apidAnalytics
+
+import (
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"time"
+)
+
+const (
+	maxRetries              = 3
+	retryFailedDirBatchSize = 10
+)
+
+// Each file upload is retried maxRetries times before
+// moving it to failed directory
+var retriesMap map[string]int
+
+//TODO:  make sure that this instance gets initialized only once
+// since we dont want multiple upload manager tickers running
+func initUploadManager() {
+
+	retriesMap = make(map[string]int)
+
+	go func() {
+		// Periodically check the staging directory to check
+		// if any folders are ready to be uploaded to S3
+		ticker := time.NewTicker(time.Second *
+			config.GetDuration(analyticsUploadInterval))
+		log.Debugf("Intialized upload manager to check for staging directory")
+		// Ticker will keep running till go routine is running
+		// i.e. till application is running
+		defer ticker.Stop()
+
+		for range ticker.C {
+			files, err := ioutil.ReadDir(localAnalyticsStagingDir)
+
+			if err != nil {
+				log.Errorf("Cannot read directory: "+
+					"%s", localAnalyticsStagingDir)
+			}
+
+			uploadedDirCnt := 0
+			for _, file := range files {
+				if file.IsDir() {
+					status := uploadDir(file)
+					handleUploadDirStatus(file, status)
+					if status {
+						uploadedDirCnt++
+					}
+				}
+			}
+			if uploadedDirCnt > 0 {
+				// After a successful upload, retry the
+				// folders in failed directory as they might have
+				// failed due to intermitent S3/GCS issue
+				retryFailedUploads()
+			}
+		}
+	}()
+}
+
+func handleUploadDirStatus(dir os.FileInfo, status bool) {
+	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+	// If upload is successful then delete files
+	// and remove bucket from retry map
+	if status {
+		os.RemoveAll(completePath)
+		log.Debugf("deleted directory after "+
+			"successful upload: %s", dir.Name())
+		// remove key if exists from retry map after a successful upload
+		delete(retriesMap, dir.Name())
+	} else {
+		retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
+		if retriesMap[dir.Name()] >= maxRetries {
+			log.Errorf("Max Retires exceeded for folder: %s", completePath)
+			failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+			err := os.Rename(completePath, failedDirPath)
+			if err != nil {
+				log.Errorf("Cannot move directory '%s'"+
+					" from staging to failed folder", dir.Name())
+			}
+			// remove key from retry map once it reaches allowed max failed attempts
+			delete(retriesMap, dir.Name())
+		}
+	}
+}
+
+func retryFailedUploads() {
+	failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir)
+
+	if err != nil {
+		log.Errorf("Cannot read directory: %s", localAnalyticsFailedDir)
+	}
+
+	cnt := 0
+	for _, dir := range failedDirs {
+		// We rety failed folder in batches to not overload the upload thread
+		if cnt < retryFailedDirBatchSize {
+			failedPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+			newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+			err := os.Rename(failedPath, newStagingPath)
+			if err != nil {
+				log.Errorf("Cannot move directory '%s'"+
+					" from failed to staging folder", dir.Name())
+			}
+		} else {
+			break
+		}
+	}
+}
diff --git a/uploader.go b/uploader.go
new file mode 100644
index 0000000..fb8a267
--- /dev/null
+++ b/uploader.go
@@ -0,0 +1,155 @@
+package apidAnalytics
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strings"
+	"time"
+)
+
+const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
+
+var token string
+
+var client *http.Client = &http.Client{
+	//set default timeout of 60 seconds while connecting to s3/GCS
+	Timeout: time.Duration(60 * time.Second),
+}
+
+func addHeaders(req *http.Request) {
+	token = config.GetString("apigeesync_bearer_token")
+	req.Header.Add("Authorization", "Bearer "+token)
+}
+
+func uploadDir(dir os.FileInfo) bool {
+	// Eg. org~env~20160101224500
+	tenant, timestamp := splitDirName(dir.Name())
+	//date=2016-01-01/time=22-45
+	dateTimePartition := getDateFromDirTimestamp(timestamp)
+
+	completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+	files, _ := ioutil.ReadDir(completePath)
+
+	status := true
+	var error error
+	for _, file := range files {
+		completeFilePath := filepath.Join(completePath, file.Name())
+		relativeFilePath := dateTimePartition + "/" + file.Name()
+		status, error = uploadFile(tenant, relativeFilePath, completeFilePath)
+		if error != nil {
+			log.Errorf("Upload failed due to: %v", error)
+			break
+		} else {
+			os.Remove(completeFilePath)
+			log.Debugf("Deleted file '%s' after "+
+				"successful upload", file.Name())
+		}
+	}
+	return status
+}
+
+func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
+	signedUrl, err := getSignedUrl(tenant, relativeFilePath)
+	if err != nil {
+		return false, err
+	} else {
+		return uploadFileToDatastore(completeFilePath, signedUrl)
+	}
+}
+
+func getSignedUrl(tenant, relativeFilePath string) (string, error) {
+	uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+	req, err := http.NewRequest("GET", uapCollectionUrl, nil)
+	if err != nil {
+		return "", err
+	}
+
+	q := req.URL.Query()
+
+	// eg. edgexfeb1~test
+	q.Add("tenant", tenant)
+	// eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz
+	q.Add("relative_file_path", relativeFilePath)
+	q.Add("file_content_type", "application/x-gzip")
+	req.URL.RawQuery = q.Encode()
+
+	// Add Bearer Token to each request
+	addHeaders(req)
+	resp, err := client.Do(req)
+	if err != nil {
+		return "", err
+	}
+	defer resp.Body.Close()
+
+	respBody, _ := ioutil.ReadAll(resp.Body)
+	if resp.StatusCode == 200 {
+		var body map[string]interface{}
+		json.Unmarshal(respBody, &body)
+		signedURL := body["url"]
+		return signedURL.(string), nil
+	} else {
+		return "", fmt.Errorf("Error while getting "+
+			"signed URL '%v'", resp.Status)
+	}
+}
+
+func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
+	// open gzip file that needs to be uploaded
+	file, err := os.Open(completeFilePath)
+	if err != nil {
+		return false, err
+	}
+	defer file.Close()
+
+	req, err := http.NewRequest("PUT", signedUrl, file)
+	if err != nil {
+		return false, fmt.Errorf("Parsing URL failed '%v'", err)
+	}
+
+	req.Header.Set("Expect", "100-continue")
+	req.Header.Set("Content-Type", "application/x-gzip")
+
+	fileStats, err := file.Stat()
+	if err != nil {
+		return false, fmt.Errorf("Could not get content length for "+
+			"file '%v'", err)
+	}
+	req.ContentLength = fileStats.Size()
+
+	resp, err := client.Do(req)
+	if err != nil {
+		return false, err
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode == 200 {
+		return true, nil
+	} else {
+		return false, fmt.Errorf("Final Datastore (S3/GCS)returned "+
+			"Error '%v'", resp.Status)
+	}
+}
+
+// Extract tenant and timestamp from directory Name
+func splitDirName(dirName string) (string, string) {
+	s := strings.Split(dirName, "~")
+	tenant := s[0] + "~" + s[1]
+	timestamp := s[2]
+	return tenant, timestamp
+}
+
+// files are uploaded to S3 under specific date time partition and that
+// key needs to be generated from the plugin
+// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
+func getDateFromDirTimestamp(timestamp string) string {
+	dateTime, _ := time.Parse(timestampLayout, timestamp)
+	date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd
+	time := dateTime.Format("15-04")      // same as HH-mm
+	dateHourTS := "date=" + date + "/time=" + time
+	return dateHourTS
+}