Merge pull request #2 from 30x/pooja_xapid_377
xapid 377
diff --git a/README b/README
deleted file mode 100644
index 09fec22..0000000
--- a/README
+++ /dev/null
@@ -1,3 +0,0 @@
-# apidApigeeAnalytics
-
-This core plugin for [apid](http://github.com/30x/apid) and resposible for collecting analytics data for runtime traffic and puplishing to Apigee.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0b41867
--- /dev/null
+++ b/README.md
@@ -0,0 +1,56 @@
+# ApidAnalytics
+
+This is a core plugin for [apid](http://github.com/30x/apid) and is responsible for collecting analytics data for
+runtime traffic from Micro and Enterprise Gateway and puplishing to Apigee.
+
+### Configuration
+
+| name | description |
+|---------------------------------------|-----------------------------------|
+| apidanalytics_base_path | string. default: /analytics |
+| apidanalytics_data_path | string. default: /ax |
+| apidanalytics_collection_interval | int. seconds. default: 120 |
+| apidanalytics_upload_interval | int. seconds. default: 5 |
+| apidanalytics_uap_server_base | string. url. required. |
+| apidanalytics_use_caching | boolean. default: true |
+| apidanalytics_buffer_channel_size | int. number of slots. default: 100|
+
+### Startup Procedure
+1. Initialize crash recovery, upload and buffering manager to handle buffering analytics messages to files
+ locally and then periodically upload these files to S3/GCS based on signedURL received from
+ uapCollectionEndpoint exposed via edgex proxy
+2. Create a listener for Apigee-Sync event
+ 1. Each time a Snapshot is received, create an in-memory cache for data scope and developer information
+ 2. Each time a changeList is received
+ 1. if data_scope info changed, then insert/delete info for changed scope from tenantCache
+ 2. if any other kms table is changed, then refresh entire developerInfo cache as only specific
+ fields are saved in the cache
+3. Initialize POST /analytics/{scope_uuid} API
+4. Upon receiving POST requests
+ 1. Validate and enrich each batch of analytics records
+ 2. If valid, then publish records to an internal buffer channel
+5. Buffering Logic
+ 1. Buffering manager creates listener on the internal buffer channel and thus consumes messages
+ as soon as they are put on the channel
+ 2. Based on the current timestamp either an existing directory is used to save these messages
+ or new a new timestamp directory is created
+ 3. If a new directory is created, then an event will be published on the closeBucketEvent Channel
+ at the expected directory closing time
+ 4. The messages are stored in a file under tmp/<timestamp_directory>
+ 5. Based on collection interval, periodically the files in tmp are closed by the routine listening on the
+ closeBucketEvent channel and the directory is moved to staging directory
+6. Upload Manager
+ 1. The upload manager periodically checks the staging directory to look for new folders
+ 2. When a new folder arrives here, it means all files under that are closed and ready to uploaded
+ 3. Tenant info is extracted from the directory name and the files are sequentially uploaded to S3/GCS
+ 4. Based on the upload status
+ 1. If upload is successful then directory is deleted from staging and previously failed uploads are retried
+ 2. if upload fails, then upload is retried 3 times before moving the directory to failed directory
+7. Crash Recovery is a one time activity performed when the plugin is started to
+ cleanly handle open files from a previous Apid stop or crash event
+
+### Exposed API
+```sh
+POST /analytics/{bundle_scope_uuid}
+```
+Complete spec is listed in `api.yaml`
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..03f5e31
--- /dev/null
+++ b/api.go
@@ -0,0 +1,67 @@
+package apidAnalytics
+
+import (
+ "github.com/30x/apid"
+ "net/http"
+ "strings"
+)
+
+var analyticsBasePath string
+
+type errResponse struct {
+ ErrorCode string `json:"errorCode"`
+ Reason string `json:"reason"`
+}
+
+type dbError struct {
+ ErrorCode string `json:"errorCode"`
+ Reason string `json:"reason"`
+}
+
+func initAPI(services apid.Services) {
+ log.Debug("initialized API's exposed by apidAnalytics plugin")
+ analyticsBasePath = config.GetString(configAnalyticsBasePath)
+ services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}",
+ saveAnalyticsRecord).Methods("POST")
+}
+
+func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
+
+ w.Header().Set("Content-Type", "application/json; charset=UTF-8")
+
+ db := getDB() // When database isnt initialized
+ if db == nil {
+ writeError(w, http.StatusInternalServerError,
+ "INTERNAL_SERVER_ERROR",
+ "Service is not initialized completely")
+ return
+ }
+
+ if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") {
+ writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE",
+ "Only supported content type is application/json")
+ return
+ }
+
+ vars := apid.API().Vars(r)
+ scopeuuid := vars["bundle_scope_uuid"]
+ tenant, dbErr := getTenantForScope(scopeuuid)
+ if dbErr.ErrorCode != "" {
+ switch dbErr.ErrorCode {
+ case "INTERNAL_SEARCH_ERROR":
+ writeError(w, http.StatusInternalServerError,
+ "INTERNAL_SEARCH_ERROR", dbErr.Reason)
+ case "UNKNOWN_SCOPE":
+ writeError(w, http.StatusBadRequest,
+ "UNKNOWN_SCOPE", dbErr.Reason)
+ }
+ } else {
+ err := processPayload(tenant, scopeuuid, r)
+ if err.ErrorCode == "" {
+ w.WriteHeader(http.StatusOK)
+ } else {
+ writeError(w, http.StatusBadRequest,
+ err.ErrorCode, err.Reason)
+ }
+ }
+}
diff --git a/api.yaml b/api.yaml
new file mode 100644
index 0000000..9b9ccfa
--- /dev/null
+++ b/api.yaml
@@ -0,0 +1,147 @@
+swagger: "2.0"
+info:
+ version: "v1"
+ title: APID analytics API
+host: playground.apistudio.io
+basePath: /try/64e409ad-aebb-4bbc-977e-f0e0f22209d4
+schemes:
+ - http
+ - https
+consumes:
+ - application/json
+produces:
+ - application/json
+paths:
+ '/analytics/{bundle_scope_uuid}':
+ x-swagger-router-controller: analytics
+ parameters:
+ - name: bundle_scope_uuid
+ in: path
+ required: true
+ description: bundle UUID that can be mapped to a scope by APID
+ type: string
+ - name: analytics_data
+ in: body
+ description: The analytics data you want to post
+ required: true
+ schema:
+ $ref: "#/definitions/records"
+ post:
+ responses:
+ "200":
+ description: Success
+ "400":
+ description: Bad Request
+ schema:
+ $ref: "#/definitions/errClientError"
+ "500":
+ description: Server error
+ schema:
+ $ref: "#/definitions/errServerError"
+ default:
+ description: Error
+ schema:
+ $ref: "#/definitions/errResponse"
+
+definitions:
+ records:
+ type: object
+ required:
+ - records
+ properties:
+ records:
+ type: array
+ minItems: 1
+ items:
+ $ref: "#/definitions/eachRecord"
+ example: {
+ "records":[{
+ "response_status_code": 400,
+ "client_received_start_timestamp": 1462850097576,
+ "client_id":"0GJKn7EQmNkKYcGL7x3gHaawWLs5gUPr"
+ },{
+ "response_status_code": 200,
+ "client_id":"2ngXgr6Rl2PXWiEmbt8zCkWY3Ptjb8ep",
+ "request_verb" : "GET",
+ "api_product":" test_product",
+ "access_token" : "fewGWG343LDV346345YCDS",
+ "apiproxy" : "OAuthProxy",
+ "apiproxy_revision" : "2",
+ "client_ip": "10.16.9.11",
+ "client_sent_end_timestamp": 1462850097894,
+ "client_received_start_timestamp": 1462850097576,
+ "client_received_end_timestamp": 1462850097580,
+ "client_sent_start_timestamp": 1462850097894,
+ "request_path" : "/oauth/oauthv2/auth_code/",
+ "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB",
+ "useragent" : "Chrome",
+ "target" : "target_name",
+ "target_received_end_timestamp": 1462850097800,
+ "target_received_start_timestamp": 1462850097800,
+ "target_response_code" : 200,
+ "target_sent_end_timestamp" : 1462850097802,
+ "target_sent_start_timestamp" : 1462850097802
+ }]
+ }
+
+ eachRecord:
+ description: Each record is basically a map of key value pair. client_received_start_timestamp is a required property but more fields can be added
+ type: object
+ required:
+ - client_received_start_timestamp
+ properties:
+ client_received_start_timestamp:
+ type: integer
+ format: int64
+ example: {
+ "response_status_code":400,
+ "client_received_start_timestamp":1462850097576,
+ "client_id":"0GJKn7EQmNkKYcGL7x3gHaawWLs5gUPr"
+ }
+
+ errClientError:
+ required:
+ - errrorCode
+ - reason
+ properties:
+ errrorCode:
+ type: string
+ enum:
+ - UNKNOWN_SCOPE
+ - BAD_DATA
+ - UNSUPPORTED_CONTENT_TYPE
+ - UNSUPPORTED_CONTENT_ENCODING
+ - MISSING_FIELD
+ reason:
+ type: string
+ example: {
+ "errrorCode":"UNKNOWN_SCOPE",
+ "reason":"No tenant found for this scopeuuid : UUID"
+ }
+
+ errServerError:
+ required:
+ - errrorCode
+ - reason
+ properties:
+ errrorCode:
+ type: string
+ enum:
+ - INTERNAL_SERVER_ERROR
+ - INTERNAL_SEARCH_ERROR
+ reason:
+ type: string
+ example: {
+ "errrorCode":"INTERNAL_SERVER_ERROR",
+ "reason":"Service is not initialized completely"
+ }
+
+ errResponse:
+ required:
+ - errrorCode
+ - reason
+ properties:
+ errrorCode:
+ type: string
+ reason:
+ type: string
\ No newline at end of file
diff --git a/api_helper.go b/api_helper.go
new file mode 100644
index 0000000..24dba22
--- /dev/null
+++ b/api_helper.go
@@ -0,0 +1,181 @@
+package apidAnalytics
+
+import (
+ "compress/gzip"
+ "encoding/json"
+ "io"
+ "net/http"
+ "strings"
+)
+
+/*
+Implements all the helper methods needed to process the POST /analytics payload
+and send it to the internal buffer channel
+*/
+
+type developerInfo struct {
+ ApiProduct string
+ DeveloperApp string
+ DeveloperEmail string
+ Developer string
+}
+
+type axRecords struct {
+ Tenant tenant
+ // Records is an array of multiple analytics records
+ Records []interface{}
+}
+
+type tenant struct {
+ Org string
+ Env string
+ TenantId string
+}
+
+func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
+ var gzipEncoded bool
+ if r.Header.Get("Content-Encoding") != "" {
+ if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
+ return errResponse{
+ ErrorCode: "UNSUPPORTED_CONTENT_ENCODING",
+ Reason: "Only supported content encoding is gzip"}
+ } else {
+ gzipEncoded = true
+ }
+ }
+
+ var reader io.ReadCloser
+ var err error
+ if gzipEncoded {
+ reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
+ if err != nil {
+ return errResponse{
+ ErrorCode: "BAD_DATA",
+ Reason: "Gzip Encoded data cannot be read"}
+ }
+ } else {
+ reader = r.Body
+ }
+
+ errMessage := validateEnrichPublish(tenant, scopeuuid, reader)
+ if errMessage.ErrorCode != "" {
+ return errMessage
+ }
+ return errResponse{}
+}
+
+func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse {
+ var raw map[string]interface{}
+ decoder := json.NewDecoder(reader) // Decode payload to JSON data
+ decoder.UseNumber()
+
+ if err := decoder.Decode(&raw); err != nil {
+ return errResponse{ErrorCode: "BAD_DATA",
+ Reason: "Not a valid JSON payload"}
+ }
+
+ if records := raw["records"]; records != nil {
+ // Iterate through each record to validate and enrich it
+ for _, eachRecord := range records.([]interface{}) {
+ recordMap := eachRecord.(map[string]interface{})
+ valid, err := validate(recordMap)
+ if valid {
+ enrich(recordMap, scopeuuid, tenant)
+ } else {
+ // Even if there is one bad record, then reject entire batch
+ return err
+ }
+ }
+ axRecords := axRecords{
+ Tenant: tenant,
+ Records: records.([]interface{})}
+ // publish batch of records to channel (blocking call)
+ internalBuffer <- axRecords
+ } else {
+ return errResponse{
+ ErrorCode: "NO_RECORDS",
+ Reason: "No analytics records in the payload"}
+ }
+ return errResponse{}
+}
+
+/*
+Does basic validation on each analytics message
+1. client_received_start_timestamp should exist
+2. if client_received_end_timestamp exists then it should be > client_received_start_timestamp
+*/
+func validate(recordMap map[string]interface{}) (bool, errResponse) {
+ elems := []string{"client_received_start_timestamp"}
+ for _, elem := range elems {
+ if recordMap[elem] == nil {
+ return false, errResponse{
+ ErrorCode: "MISSING_FIELD",
+ Reason: "Missing Required field: " + elem}
+ }
+ }
+
+ crst, exists1 := recordMap["client_received_start_timestamp"]
+ cret, exists2 := recordMap["client_received_end_timestamp"]
+ if exists1 && exists2 {
+ if crst.(json.Number) > cret.(json.Number) {
+ return false, errResponse{
+ ErrorCode: "BAD_DATA",
+ Reason: "client_received_start_timestamp " +
+ "> client_received_end_timestamp"}
+ }
+ }
+ return true, errResponse{}
+}
+
+/*
+Enrich each record by adding org and env fields
+It also finds add developer related information based on the apiKey
+*/
+func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
+ org, orgExists := recordMap["organization"]
+ if !orgExists || org.(string) == "" {
+ recordMap["organization"] = tenant.Org
+ }
+
+ env, envExists := recordMap["environment"]
+ if !envExists || env.(string) == "" {
+ recordMap["environment"] = tenant.Env
+ }
+
+ apiKey, exists := recordMap["client_id"]
+ // apiKey doesnt exist then ignore adding developer fields
+ if exists {
+ apiKey := apiKey.(string)
+ devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
+ _, exists := recordMap["api_product"]
+ if !exists {
+ recordMap["api_product"] = devInfo.ApiProduct
+ }
+ _, exists = recordMap["developer_app"]
+ if !exists {
+ recordMap["developer_app"] = devInfo.DeveloperApp
+ }
+ _, exists = recordMap["developer_email"]
+ if !exists {
+ recordMap["developer_email"] = devInfo.DeveloperEmail
+ }
+ _, exists = recordMap["developer"]
+ if !exists {
+ recordMap["developer"] = devInfo.Developer
+ }
+ }
+}
+
+func writeError(w http.ResponseWriter, status int, code string, reason string) {
+ w.WriteHeader(status)
+ e := errResponse{
+ ErrorCode: code,
+ Reason: reason,
+ }
+ bytes, err := json.Marshal(e)
+ if err != nil {
+ log.Errorf("unable to marshal errorResponse: %v", err)
+ } else {
+ w.Write(bytes)
+ }
+}
diff --git a/api_test.go b/api_test.go
new file mode 100644
index 0000000..d172a29
--- /dev/null
+++ b/api_test.go
@@ -0,0 +1,49 @@
+package apidAnalytics
+
+import (
+ "net/http"
+ "net/url"
+ "strings"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+// BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go
+var _ = Describe("testing saveAnalyticsRecord() directly", func() {
+ Context("valid scopeuuid", func() {
+ It("should successfully return", func() {
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = analyticsBasePath
+
+ v := url.Values{}
+ v.Add("bundle_scope_uuid", "testid")
+
+ client := &http.Client{}
+ req, err := http.NewRequest("POST", uri.String(),
+ strings.NewReader(v.Encode()))
+ res, err := client.Do(req)
+ defer res.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(res.StatusCode, http.StatusOK)
+ })
+ })
+
+ Context("invalid scopeuuid", func() {
+ It("should return bad request", func() {
+ uri, err := url.Parse(testServer.URL)
+ uri.Path = analyticsBasePath
+
+ v := url.Values{}
+ v.Add("bundle_scope_uuid", "wrongId")
+
+ client := &http.Client{}
+ req, err := http.NewRequest("POST", uri.String(),
+ strings.NewReader(v.Encode()))
+ res, err := client.Do(req)
+ defer res.Body.Close()
+ Expect(err).ShouldNot(HaveOccurred())
+ Expect(res.StatusCode, http.StatusBadRequest)
+ })
+ })
+})
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
new file mode 100644
index 0000000..1e5c128
--- /dev/null
+++ b/apidAnalytics_suite_test.go
@@ -0,0 +1,232 @@
+package apidAnalytics
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+
+ "github.com/30x/apid"
+ "github.com/30x/apid/factory"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+)
+
+var (
+ testTempDir string
+ testServer *httptest.Server
+)
+
+func TestApidAnalytics(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "ApidAnalytics Suite")
+}
+
+var _ = BeforeSuite(func() {
+ apid.Initialize(factory.DefaultServicesFactory())
+
+ config := apid.Config()
+
+ var err error
+ testTempDir, err = ioutil.TempDir("", "api_test")
+ Expect(err).NotTo(HaveOccurred())
+
+ config.Set("data_path", testTempDir)
+ config.Set(uapServerBase, "http://localhost:9000") // dummy value
+ config.Set("apigeesync_apid_instance_id",
+ "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
+ config.Set(useCaching, true)
+
+ db, err := apid.Data().DB()
+ Expect(err).NotTo(HaveOccurred())
+ setDB(db)
+ createApidClusterTables(db)
+ createTables(db)
+ insertTestData(db)
+ apid.InitializePlugins()
+
+ // Create cache else its created in listener.go when a snapshot is received
+ createTenantCache()
+ createDeveloperInfoCache()
+
+ testServer = httptest.NewServer(http.HandlerFunc(
+ func(w http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == analyticsBasePathDefault {
+ saveAnalyticsRecord(w, req)
+ }
+ }))
+})
+
+func createTables(db apid.DB) {
+ _, err := db.Exec(`
+ CREATE TABLE IF NOT EXISTS api_product (
+ id text,
+ tenant_id text,
+ name text,
+ display_name text,
+ description text,
+ api_resources text[],
+ approval_type text,
+ _change_selector text,
+ proxies text[],
+ environments text[],
+ quota text,
+ quota_time_unit text,
+ quota_interval int,
+ created_at int64,
+ created_by text,
+ updated_at int64,
+ updated_by text,
+ PRIMARY KEY (tenant_id, id));
+ CREATE TABLE IF NOT EXISTS developer (
+ id text,
+ tenant_id text,
+ username text,
+ first_name text,
+ last_name text,
+ password text,
+ email text,
+ status text,
+ encrypted_password text,
+ salt text,
+ _change_selector text,
+ created_at int64,
+ created_by text,
+ updated_at int64,
+ updated_by text,
+ PRIMARY KEY (tenant_id, id)
+ );
+ CREATE TABLE IF NOT EXISTS app (
+ id text,
+ tenant_id text,
+ name text,
+ display_name text,
+ access_type text,
+ callback_url text,
+ status text,
+ app_family text,
+ company_id text,
+ developer_id text,
+ type int,
+ created_at int64,
+ created_by text,
+ updated_at int64,
+ updated_by text,
+ _change_selector text,
+ PRIMARY KEY (tenant_id, id)
+ );
+ CREATE TABLE IF NOT EXISTS app_credential_apiproduct_mapper (
+ tenant_id text,
+ appcred_id text,
+ app_id text,
+ apiprdt_id text,
+ _change_selector text,
+ status text,
+ PRIMARY KEY (appcred_id, app_id, apiprdt_id,tenant_id)
+ );
+ `)
+ if err != nil {
+ panic("Unable to initialize DB " + err.Error())
+ }
+}
+
+func createApidClusterTables(db apid.DB) {
+ _, err := db.Exec(`
+ CREATE TABLE apid_cluster (
+ id text,
+ instance_id text,
+ name text,
+ description text,
+ umbrella_org_app_name text,
+ created int64,
+ created_by text,
+ updated int64,
+ updated_by text,
+ _change_selector text,
+ snapshotInfo text,
+ lastSequence text,
+ PRIMARY KEY (id)
+ );
+ CREATE TABLE data_scope (
+ id text,
+ apid_cluster_id text,
+ scope text,
+ org text,
+ env text,
+ created int64,
+ created_by text,
+ updated int64,
+ updated_by text,
+ _change_selector text,
+ PRIMARY KEY (id)
+ );
+ `)
+ if err != nil {
+ panic("Unable to initialize DB " + err.Error())
+ }
+}
+
+func insertTestData(db apid.DB) {
+
+ txn, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id,"+
+ " appcred_id, app_id, apiprdt_id, status, _change_selector) "+
+ "VALUES"+
+ "($1,$2,$3,$4,$5,$6)",
+ "tenantid",
+ "testapikey",
+ "testappid",
+ "testproductid",
+ "APPROVED",
+ "12345",
+ )
+
+ txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+
+ "VALUES"+
+ "($1,$2,$3,$4)",
+ "testappid",
+ "tenantid",
+ "testapp",
+ "testdeveloperid",
+ )
+
+ txn.Exec("INSERT INTO API_PRODUCT (id, tenant_id, name) "+
+ "VALUES"+
+ "($1,$2,$3)",
+ "testproductid",
+ "tenantid",
+ "testproduct",
+ )
+
+ txn.Exec("INSERT INTO DEVELOPER (id, tenant_id, username, email) "+
+ "VALUES"+
+ "($1,$2,$3,$4)",
+ "testdeveloperid",
+ "tenantid",
+ "testdeveloper",
+ "testdeveloper@test.com",
+ )
+
+ txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, "+
+ "apid_cluster_id, scope, org, env) "+
+ "VALUES"+
+ "($1,$2,$3,$4,$5,$6)",
+ "testid",
+ "some_change_selector",
+ "some_cluster_id",
+ "tenantid",
+ "testorg",
+ "testenv",
+ )
+ txn.Commit()
+}
+
+var _ = AfterSuite(func() {
+ apid.Events().Close()
+ if testServer != nil {
+ testServer.Close()
+ }
+ os.RemoveAll(testTempDir)
+})
diff --git a/buffering_manager.go b/buffering_manager.go
new file mode 100644
index 0000000..7e28d4b
--- /dev/null
+++ b/buffering_manager.go
@@ -0,0 +1,179 @@
+package apidAnalytics
+
+import (
+ "bufio"
+ "compress/gzip"
+ "crypto/rand"
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+const fileExtension = ".txt.gz"
+
+// Channel where analytics records are buffered before being dumped to a
+// file as write to file should not performed in the Http Thread
+var internalBuffer chan axRecords
+
+// Channel where close bucket event is published i.e. when a bucket
+// is ready to be closed based on collection interval
+var closeBucketEvent chan bucket
+
+// Map from timestampt to bucket
+var bucketMap map[int64]bucket
+
+type bucket struct {
+ DirName string
+ // We need file handle and writer to close the file
+ FileWriter fileWriter
+}
+
+// This struct will store open file handle and writer to close the file
+type fileWriter struct {
+ file *os.File
+ gw *gzip.Writer
+ bw *bufio.Writer
+}
+
+func initBufferingManager() {
+ internalBuffer = make(chan axRecords,
+ config.GetInt(analyticsBufferChannelSize))
+ closeBucketEvent = make(chan bucket)
+ bucketMap = make(map[int64]bucket)
+
+ // Keep polling the internal buffer for new messages
+ go func() {
+ for {
+ records := <-internalBuffer
+ err := save(records)
+ if err != nil {
+ log.Errorf("Could not save %d messages to file"+
+ " due to: %v", len(records.Records), err)
+ }
+ }
+ }()
+
+ // Keep polling the closeEvent channel to see if bucket is ready to be closed
+ go func() {
+ for {
+ bucket := <-closeBucketEvent
+ log.Debugf("Close Event received for bucket: %s",
+ bucket.DirName)
+
+ // close open file
+ closeGzipFile(bucket.FileWriter)
+
+ dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
+ stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
+ err := os.Rename(dirToBeClosed, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from"+
+ " tmp to staging folder", bucket.DirName)
+ }
+ }
+ }()
+}
+
+// Save records to correct file based on what timestamp data is being collected for
+func save(records axRecords) error {
+ bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
+ if err != nil {
+ return err
+ }
+ writeGzipFile(bucket.FileWriter, records.Records)
+ return nil
+}
+
+func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
+ // first based on current timestamp and collection interval,
+ // determine the timestamp of the bucket
+ ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
+ _, exists := bucketMap[ts]
+ if exists {
+ return bucketMap[ts], nil
+ } else {
+ timestamp := time.Unix(ts, 0).Format(timestampLayout)
+
+ // endtimestamp of bucket = starttimestamp + collectionInterval
+ endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
+ endtimestamp := endTime.Format(timestampLayout)
+
+ dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
+ newPath := filepath.Join(localAnalyticsTempDir, dirName)
+ // create dir
+ err := os.Mkdir(newPath, os.ModePerm)
+ if err != nil {
+ return bucket{}, fmt.Errorf("Cannot create directory "+
+ "'%s' to buffer messages '%v'", dirName, err)
+ }
+
+ // create file for writing
+ // Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz
+ fileName := getRandomHex() + "_" + timestamp + "." +
+ endtimestamp + "_" +
+ config.GetString("apigeesync_apid_instance_id") +
+ "_writer_0" + fileExtension
+ completeFilePath := filepath.Join(newPath, fileName)
+ fw, err := createGzipFile(completeFilePath)
+ if err != nil {
+ return bucket{}, err
+ }
+
+ newBucket := bucket{DirName: dirName, FileWriter: fw}
+ bucketMap[ts] = newBucket
+
+ //Send event to close directory after endTime + 5
+ // seconds to make sure all buffers are flushed to file
+ timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
+ go func() {
+ <-timer
+ closeBucketEvent <- newBucket
+ }()
+ return newBucket, nil
+ }
+}
+
+// 4 digit Hex is prefixed to each filename to improve
+// how s3 partitions the files being uploaded
+func getRandomHex() string {
+ buff := make([]byte, 2)
+ rand.Read(buff)
+ return fmt.Sprintf("%x", buff)
+}
+
+func createGzipFile(s string) (fileWriter, error) {
+ file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
+ if err != nil {
+ return fileWriter{},
+ fmt.Errorf("Cannot create file '%s' "+
+ "to buffer messages '%v'", s, err)
+ }
+ gw := gzip.NewWriter(file)
+ bw := bufio.NewWriter(gw)
+ return fileWriter{file, gw, bw}, nil
+}
+
+func writeGzipFile(fw fileWriter, records []interface{}) {
+ // write each record as a new line to the bufferedWriter
+ for _, eachRecord := range records {
+ s, _ := json.Marshal(eachRecord)
+ _, err := (fw.bw).WriteString(string(s))
+ if err != nil {
+ log.Errorf("Write to file failed '%v'", err)
+ }
+ (fw.bw).WriteString("\n")
+ }
+ // Flush entire batch of records to file vs each message
+ fw.bw.Flush()
+ fw.gw.Flush()
+}
+
+func closeGzipFile(fw fileWriter) {
+ fw.bw.Flush()
+ fw.gw.Close()
+ fw.file.Close()
+}
diff --git a/common_helper.go b/common_helper.go
new file mode 100644
index 0000000..9acc033
--- /dev/null
+++ b/common_helper.go
@@ -0,0 +1,215 @@
+package apidAnalytics
+
+import (
+ "database/sql"
+ "fmt"
+ "sync"
+)
+
+// Cache for scope uuid to org, env and tenantId information
+var tenantCache map[string]tenant
+
+// RW lock for tenant map cache since the cache can be
+// read while its being written to and vice versa
+var tenantCachelock = sync.RWMutex{}
+
+// Cache for apiKey~tenantId to developer related information
+var developerInfoCache map[string]developerInfo
+
+// RW lock for developerInfo map cache since the cache can be
+// read while its being written to and vice versa
+var developerInfoCacheLock = sync.RWMutex{}
+
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
+func createTenantCache() error {
+ // Lock before writing to the map as it has multiple readers
+ tenantCachelock.Lock()
+ defer tenantCachelock.Unlock()
+ tenantCache = make(map[string]tenant)
+ var org, env, tenantId, id string
+
+ db := getDB()
+ rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE")
+
+ if error != nil {
+ return fmt.Errorf("Count not get datascope from "+
+ "DB due to: %v", error)
+ } else {
+ defer rows.Close()
+ for rows.Next() {
+ rows.Scan(&env, &org, &tenantId, &id)
+ tenantCache[id] = tenant{Org: org,
+ Env: env,
+ TenantId: tenantId}
+ }
+ }
+
+ log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
+ return nil
+}
+
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
+func createDeveloperInfoCache() error {
+ // Lock before writing to the map as it has multiple readers
+ developerInfoCacheLock.Lock()
+ defer developerInfoCacheLock.Unlock()
+ developerInfoCache = make(map[string]developerInfo)
+ var apiProduct, developerApp, developerEmail, developer sql.NullString
+ var tenantId, apiKey string
+
+ db := getDB()
+ sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name," +
+ " a.name, d.username, d.email " +
+ "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
+ "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
+ "INNER JOIN APP AS a ON a.id = mp.app_id " +
+ "INNER JOIN DEVELOPER as d ON d.id = a.developer_id;"
+ rows, error := db.Query(sSql)
+
+ if error != nil {
+ return fmt.Errorf("Count not get developerInfo "+
+ "from DB due to: %v", error)
+ } else {
+ defer rows.Close()
+ for rows.Next() {
+ rows.Scan(&tenantId, &apiKey, &apiProduct,
+ &developerApp, &developer, &developerEmail)
+
+ keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
+ apiPrd := getValuesIgnoringNull(apiProduct)
+ devApp := getValuesIgnoringNull(developerApp)
+ dev := getValuesIgnoringNull(developer)
+ devEmail := getValuesIgnoringNull(developerEmail)
+
+ developerInfoCache[keyForMap] = developerInfo{
+ ApiProduct: apiPrd,
+ DeveloperApp: devApp,
+ DeveloperEmail: devEmail,
+ Developer: dev}
+ }
+ }
+
+ log.Debugf("Count of apiKey~tenantId combinations "+
+ "in the cache: %d", len(developerInfoCache))
+ return nil
+}
+
+// Returns Tenant Info given a scope uuid from the cache or by querying
+// the DB directly based on useCachig config
+func getTenantForScope(scopeuuid string) (tenant, dbError) {
+ if config.GetBool(useCaching) {
+ // acquire a read lock as this cache has 1 writer as well
+ tenantCachelock.RLock()
+ defer tenantCachelock.RUnlock()
+ ten, exists := tenantCache[scopeuuid]
+ if !exists {
+ reason := "No tenant found for this scopeuuid: " + scopeuuid
+ errorCode := "UNKNOWN_SCOPE"
+ // Incase of unknown scope, try to refresh the
+ // cache ansynchronously incase an update was missed or delayed
+ go createTenantCache()
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
+ } else {
+ return ten, dbError{}
+ }
+ } else {
+ var org, env, tenantId string
+
+ db := getDB()
+ error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+
+ " where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
+
+ switch {
+ case error == sql.ErrNoRows:
+ reason := "No tenant found for this scopeuuid: " + scopeuuid
+ errorCode := "UNKNOWN_SCOPE"
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
+ case error != nil:
+ reason := error.Error()
+ errorCode := "INTERNAL_SEARCH_ERROR"
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
+ }
+ return tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantId}, dbError{}
+ }
+}
+
+// Returns Dveloper related info given an apiKey and tenantId
+// from the cache or by querying the DB directly based on useCachig config
+func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
+ if config.GetBool(useCaching) {
+ keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
+ // acquire a read lock as this cache has 1 writer as well
+ tenantCachelock.RLock()
+ defer tenantCachelock.RUnlock()
+ devInfo, exists := developerInfoCache[keyForMap]
+ if !exists {
+ log.Warnf("No data found for for tenantId = %s"+
+ " and apiKey = %s", tenantId, apiKey)
+ // Incase of unknown apiKey~tenantId,
+ // try to refresh the cache ansynchronously incase an update was missed or delayed
+ go createTenantCache()
+ return developerInfo{}
+ } else {
+ return devInfo
+ }
+ } else {
+ var apiProduct, developerApp, developerEmail sql.NullString
+ var developer sql.NullString
+
+ db := getDB()
+ sSql := "SELECT ap.name, a.name, d.username, d.email " +
+ "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
+ "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
+ "INNER JOIN APP AS a ON a.id = mp.app_id " +
+ "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
+ "where mp.tenant_id = ? and mp.appcred_id = ?;"
+ error := db.QueryRow(sSql, tenantId, apiKey).
+ Scan(&apiProduct, &developerApp,
+ &developer, &developerEmail)
+
+ switch {
+ case error == sql.ErrNoRows:
+ log.Debugf("No data found for for tenantId = %s "+
+ "and apiKey = %s", tenantId, apiKey)
+ return developerInfo{}
+ case error != nil:
+ log.Debugf("No data found for for tenantId = %s and "+
+ "apiKey = %s due to: %v", tenantId, apiKey, error)
+ return developerInfo{}
+ }
+
+ apiPrd := getValuesIgnoringNull(apiProduct)
+ devApp := getValuesIgnoringNull(developerApp)
+ dev := getValuesIgnoringNull(developer)
+ devEmail := getValuesIgnoringNull(developerEmail)
+ return developerInfo{ApiProduct: apiPrd,
+ DeveloperApp: devApp,
+ DeveloperEmail: devEmail,
+ Developer: dev}
+ }
+}
+
+// Helper method to handle scanning null values in DB to empty string
+func getValuesIgnoringNull(sqlValue sql.NullString) string {
+ if sqlValue.Valid {
+ return sqlValue.String
+ } else {
+ return ""
+ }
+}
+
+// Build Key as a combination of tenantId and apiKey for the developerInfo Cache
+func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string {
+ return tenantId + "~" + apiKey
+}
diff --git a/common_helper_test.go b/common_helper_test.go
new file mode 100644
index 0000000..c691d52
--- /dev/null
+++ b/common_helper_test.go
@@ -0,0 +1,48 @@
+package apidAnalytics
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("test getTenantForScope()", func() {
+ Context("get tenant for valid scopeuuid", func() {
+ It("should return testorg and testenv", func() {
+ tenant, dbError := getTenantForScope("testid")
+ Expect(dbError.Reason).To(Equal(""))
+ Expect(tenant.Org).To(Equal("testorg"))
+ Expect(tenant.Env).To(Equal("testenv"))
+ Expect(tenant.TenantId).To(Equal("tenantid"))
+ })
+ })
+
+ Context("get tenant for invalid scopeuuid", func() {
+ It("should return empty tenant and a db error", func() {
+ tenant, dbError := getTenantForScope("wrongid")
+ Expect(tenant.Org).To(Equal(""))
+ Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE"))
+ })
+ })
+})
+
+var _ = Describe("test getDeveloperInfo()", func() {
+ Context("get developerInfo for valid tenantId and apikey", func() {
+ It("should return all right data", func() {
+ developerInfo := getDeveloperInfo("tenantid", "testapikey")
+ Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
+ Expect(developerInfo.Developer).To(Equal("testdeveloper"))
+ Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com"))
+ Expect(developerInfo.DeveloperApp).To(Equal("testapp"))
+ })
+ })
+
+ Context("get developerInfo for invalid tenantId and apikey", func() {
+ It("should return all right data", func() {
+ developerInfo := getDeveloperInfo("wrongid", "wrongapikey")
+ Expect(developerInfo.ApiProduct).To(Equal(""))
+ Expect(developerInfo.Developer).To(Equal(""))
+ Expect(developerInfo.DeveloperEmail).To(Equal(""))
+ Expect(developerInfo.DeveloperApp).To(Equal(""))
+ })
+ })
+})
diff --git a/crash_recovery.go b/crash_recovery.go
new file mode 100644
index 0000000..ccd3b36
--- /dev/null
+++ b/crash_recovery.go
@@ -0,0 +1,189 @@
+package apidAnalytics
+
+import (
+ "bufio"
+ "compress/gzip"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+const (
+ crashRecoveryDelay = 30 // seconds
+ // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
+ recoveryTSLayout = "20060102150405.000"
+ // Constant to identify recovered files
+ recoveredTS = "~recoveredTS~"
+)
+
+func initCrashRecovery() {
+ if crashRecoveryNeeded() {
+ timer := time.After(time.Second * crashRecoveryDelay)
+ // Actual recovery of files is attempted asynchronously
+ // after a delay to not block the apid plugin from starting up
+ go func() {
+ <-timer
+ performRecovery()
+ }()
+ }
+}
+
+// Crash recovery is needed if there are any folders in tmp or recovered directory
+func crashRecoveryNeeded() bool {
+ recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
+ tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
+ needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
+ if needed {
+ log.Infof("Crash Recovery is needed and will be "+
+ "attempted in %d seconds", crashRecoveryDelay)
+ }
+ return needed
+}
+
+// If Apid is shutdown or crashes while a file is still open in tmp folder,
+// then the file has partial data.
+// This partial data can be recoverd.
+func recoverFoldersInTmpDir() bool {
+ tmpRecoveryNeeded := false
+ dirs, _ := ioutil.ReadDir(localAnalyticsTempDir)
+ recoveryTS := getRecoveryTS()
+ for _, dir := range dirs {
+ tmpRecoveryNeeded = true
+ log.Debugf("Moving directory '%s' from tmp to"+
+ " recovered folder", dir.Name())
+ tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
+ // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ newDirName := dir.Name() + recoveredTS + recoveryTS
+ recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName)
+ err := os.Rename(tmpCompletePath, recoveredCompletePath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' "+
+ "from tmp to recovered folder", dir.Name())
+ }
+ }
+ return tmpRecoveryNeeded
+}
+
+// Get Timestamp for when the recovery is being attempted on the folder.
+func getRecoveryTS() string {
+ current := time.Now()
+ return current.Format(recoveryTSLayout)
+}
+
+// If APID is restarted twice immediately such that files have been moved to
+// recovered folder but actual recovery has'nt started or is partially done
+// Then the files will just stay in the recovered dir
+// and need to be recovered again.
+func recoverFolderInRecoveredDir() bool {
+ dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+ if len(dirs) > 0 {
+ return true
+ }
+ return false
+}
+
+func performRecovery() {
+ log.Info("Crash recovery is starting...")
+ recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+ for _, dir := range recoveryDirs {
+ recoverDirectory(dir.Name())
+ }
+ log.Info("Crash recovery complete...")
+}
+
+func recoverDirectory(dirName string) {
+ log.Infof("performing crash recovery for directory: %s", dirName)
+ var bucketRecoveryTS string
+
+ // Parse bucket name to extract recoveryTS and pass it each file to be recovered
+ // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ // -> bucketRecoveryTS = _20160101222612.123
+ index := strings.Index(dirName, recoveredTS)
+ if index != -1 {
+ bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
+ }
+
+ dirBeingRecovered := filepath.Join(localAnalyticsRecoveredDir, dirName)
+ files, _ := ioutil.ReadDir(dirBeingRecovered)
+ for _, file := range files {
+ // recovering each file sequentially for now
+ recoverFile(bucketRecoveryTS, dirName, file.Name())
+ }
+
+ stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+ err := os.Rename(dirBeingRecovered, stagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s' from"+
+ " recovered to staging folder", dirName)
+ }
+}
+
+func recoverFile(bucketRecoveryTS, dirName, fileName string) {
+ log.Debugf("performing crash recovery for file: %s ", fileName)
+ // add recovery timestamp to the file name
+ completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
+
+ recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
+ recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
+ // eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt
+ recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
+
+ // Copy complete records to new file and delete original partial file
+ copyPartialFile(completeOrigFilePath, recoveredFilePath)
+ deletePartialFile(completeOrigFilePath)
+}
+
+// The file is read line by line and all complete records are extracted
+// and copied to a new file which is closed as a correct gzip file.
+func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
+ partialFile, err := os.Open(completeOrigFilePath)
+ if err != nil {
+ log.Errorf("Cannot open file: %s", completeOrigFilePath)
+ return
+ }
+ defer partialFile.Close()
+
+ bufReader := bufio.NewReader(partialFile)
+ gzReader, err := gzip.NewReader(bufReader)
+ if err != nil {
+ log.Errorf("Cannot create reader on gzip file: %s"+
+ " due to %v", completeOrigFilePath, err)
+ return
+ }
+ defer gzReader.Close()
+
+ scanner := bufio.NewScanner(gzReader)
+
+ // Create new file to copy complete records from partial file and upload only a complete file
+ recoveredFile, err := os.OpenFile(recoveredFilePath,
+ os.O_WRONLY|os.O_CREATE, os.ModePerm)
+ if err != nil {
+ log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
+ return
+ }
+ defer recoveredFile.Close()
+
+ gzWriter := gzip.NewWriter(recoveredFile)
+ defer gzWriter.Close()
+
+ bufWriter := bufio.NewWriter(gzWriter)
+ defer bufWriter.Flush()
+
+ for scanner.Scan() {
+ bufWriter.Write(scanner.Bytes())
+ bufWriter.WriteString("\n")
+ }
+ if err := scanner.Err(); err != nil {
+ log.Warnf("Error while scanning partial file: %v", err)
+ return
+ }
+}
+
+func deletePartialFile(completeOrigFilePath string) {
+ err := os.Remove(completeOrigFilePath)
+ if err != nil {
+ log.Errorf("Cannot delete partial file: %s", completeOrigFilePath)
+ }
+}
diff --git a/glide.yaml b/glide.yaml
new file mode 100644
index 0000000..054e3eb
--- /dev/null
+++ b/glide.yaml
@@ -0,0 +1,8 @@
+package: github.com/30x/apidAnalytics
+
+import:
+- package: github.com/30x/apid
+ version: master
+testImport:
+- package: github.com/onsi/ginkgo/ginkgo
+- package: github.com/onsi/gomega
\ No newline at end of file
diff --git a/init.go b/init.go
new file mode 100644
index 0000000..a3501a4
--- /dev/null
+++ b/init.go
@@ -0,0 +1,178 @@
+package apidAnalytics
+
+import (
+ "fmt"
+ "github.com/30x/apid"
+ "os"
+ "path/filepath"
+ "sync"
+)
+
+const (
+ // Base path of analytics API that will be exposed
+ configAnalyticsBasePath = "apidanalytics_base_path"
+ analyticsBasePathDefault = "/analytics"
+
+ // Root directory for analytics local data buffering
+ configAnalyticsDataPath = "apidanalytics_data_path"
+ analyticsDataPathDefault = "/ax"
+
+ // Data collection and buffering interval in seconds
+ analyticsCollectionInterval = "apidanalytics_collection_interval"
+ analyticsCollectionIntervalDefault = "120"
+
+ // Interval in seconds based on which staging directory
+ // will be checked for folders ready to be uploaded
+ analyticsUploadInterval = "apidanalytics_upload_interval"
+ analyticsUploadIntervalDefault = "5"
+
+ // Number of slots for internal channel buffering of
+ // analytics records before they are dumped to a file
+ analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
+ analyticsBufferChannelSizeDefault = 100
+
+ // EdgeX endpoint base path to access Uap Collection Endpoint
+ uapServerBase = "apidanalytics_uap_server_base"
+
+ // If caching is used then data scope and developer
+ // info will be maintained in-memory
+ // cache to avoid DB calls for each analytics message
+ useCaching = "apidanalytics_use_caching"
+ useCachingDefault = true
+)
+
+// keep track of the services that this plugin will use
+var (
+ log apid.LogService
+ config apid.ConfigService
+ data apid.DataService
+ events apid.EventsService
+ unsafeDB apid.DB
+ dbMux sync.RWMutex
+
+ localAnalyticsBaseDir string
+ localAnalyticsTempDir string
+ localAnalyticsStagingDir string
+ localAnalyticsFailedDir string
+ localAnalyticsRecoveredDir string
+)
+
+// apid.RegisterPlugin() is required to be called in init()
+func init() {
+ apid.RegisterPlugin(initPlugin)
+}
+
+func getDB() apid.DB {
+ dbMux.RLock()
+ db := unsafeDB
+ dbMux.RUnlock()
+ return db
+}
+
+func setDB(db apid.DB) {
+ dbMux.Lock()
+ unsafeDB = db
+ dbMux.Unlock()
+}
+
+// initPlugin will be called by apid to initialize
+func initPlugin(services apid.Services) (apid.PluginData, error) {
+ // set a logger that is annotated for this plugin
+ log = services.Log().ForModule("apidAnalytics")
+ log.Debug("start init for apidAnalytics plugin")
+
+ data = services.Data()
+ events = services.Events()
+ events.Listen("ApigeeSync", &handler{})
+
+ // set configuration
+ err := setConfig(services)
+ if err != nil {
+ return pluginData, err
+ }
+
+ for _, key := range []string{uapServerBase} {
+ if !config.IsSet(key) {
+ return pluginData,
+ fmt.Errorf("Missing required config value: %s", key)
+ }
+ }
+
+ // Create directories for managing buffering and upload to UAP stages
+ directories := []string{localAnalyticsBaseDir,
+ localAnalyticsTempDir,
+ localAnalyticsStagingDir,
+ localAnalyticsFailedDir,
+ localAnalyticsRecoveredDir}
+ err = createDirectories(directories)
+
+ if err != nil {
+ return pluginData, fmt.Errorf("Cannot create "+
+ "required local directories: %v ", err)
+ }
+
+ // Initialize one time crash recovery to be performed by the plugin on start up
+ initCrashRecovery()
+
+ // Initialize upload manager to watch the staging directory and
+ // upload files to UAP as they are ready
+ initUploadManager()
+
+ // Initialize buffer manager to watch the internalBuffer channel
+ // for new messages and dump them to files
+ initBufferingManager()
+
+ // Initialize API's and expose them
+ initAPI(services)
+ log.Debug("end init for apidAnalytics plugin")
+ return pluginData, nil
+}
+
+func setConfig(services apid.Services) error {
+ config = services.Config()
+
+ // set plugin config defaults
+ config.SetDefault(configAnalyticsBasePath, analyticsBasePathDefault)
+ config.SetDefault(configAnalyticsDataPath, analyticsDataPathDefault)
+
+ if !config.IsSet("local_storage_path") {
+ return fmt.Errorf("Missing required config value: local_storage_path")
+ }
+
+ // set local directory paths that will be used to buffer analytics data on disk
+ localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"),
+ config.GetString(configAnalyticsDataPath))
+ localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp")
+ localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging")
+ localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed")
+ localAnalyticsRecoveredDir = filepath.Join(localAnalyticsBaseDir, "recovered")
+
+ // set default config for collection interval
+ config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault)
+
+ // set default config for useCaching
+ config.SetDefault(useCaching, useCachingDefault)
+
+ // set default config for upload interval
+ config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
+
+ // set default config for internal buffer size
+ config.SetDefault(analyticsBufferChannelSize, analyticsBufferChannelSizeDefault)
+
+ return nil
+}
+
+// create all missing directories if required
+func createDirectories(directories []string) error {
+ for _, path := range directories {
+ if _, err := os.Stat(path); os.IsNotExist(err) {
+ error := os.Mkdir(path, os.ModePerm)
+ if error != nil {
+ return error
+ }
+ log.Infof("created directory for analytics "+
+ "data collection: %s", path)
+ }
+ }
+ return nil
+}
diff --git a/listener.go b/listener.go
new file mode 100644
index 0000000..4c57da8
--- /dev/null
+++ b/listener.go
@@ -0,0 +1,120 @@
+package apidAnalytics
+
+import (
+ "github.com/30x/apid"
+ "github.com/apigee-labs/transicator/common"
+)
+
+type handler struct{}
+
+func (h *handler) String() string {
+ return "apigeeAnalytics"
+}
+
+func (h *handler) Handle(e apid.Event) {
+ snapData, ok := e.(*common.Snapshot)
+ if ok {
+ processSnapshot(snapData)
+ } else {
+ changeSet, ok := e.(*common.ChangeList)
+ if ok {
+ processChange(changeSet)
+ } else {
+ log.Errorf("Received Invalid event. Ignoring. %v", e)
+ }
+ }
+ return
+}
+
+func processSnapshot(snapshot *common.Snapshot) {
+ log.Debugf("Snapshot received. Switching to"+
+ " DB version: %s", snapshot.SnapshotInfo)
+
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
+ setDB(db)
+
+ if config.GetBool(useCaching) {
+ err = createTenantCache()
+ if err != nil {
+ log.Error(err)
+ } else {
+ log.Debug("Created a local cache" +
+ " for datasope information")
+ }
+ err = createDeveloperInfoCache()
+ if err != nil {
+ log.Error(err)
+ } else {
+ log.Debug("Created a local cache for developer information")
+ }
+ } else {
+ log.Info("Will not be caching any developer info " +
+ "and make a DB call for every analytics msg")
+ }
+ return
+}
+
+func processChange(changes *common.ChangeList) {
+ if config.GetBool(useCaching) {
+ log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
+ var rows []common.Row
+
+ refreshDevInfoNeeded := false
+ for _, payload := range changes.Changes {
+ rows = nil
+ switch payload.Table {
+ case "edgex.data_scope":
+ switch payload.Operation {
+ case common.Insert, common.Update:
+ rows = append(rows, payload.NewRow)
+ // Lock before writing to the
+ // map as it has multiple readers
+ tenantCachelock.Lock()
+ defer tenantCachelock.Unlock()
+ for _, ele := range rows {
+ var scopeuuid, tenantid string
+ var org, env string
+ ele.Get("id", &scopeuuid)
+ ele.Get("scope", &tenantid)
+ ele.Get("org", &org)
+ ele.Get("env", &env)
+ tenantCache[scopeuuid] = tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantid}
+ log.Debugf("Refreshed local "+
+ "tenantCache. Added "+
+ "scope: "+"%s", scopeuuid)
+ }
+ case common.Delete:
+ rows = append(rows, payload.OldRow)
+ // Lock before writing to the map
+ // as it has multiple readers
+ tenantCachelock.Lock()
+ defer tenantCachelock.Unlock()
+ for _, ele := range rows {
+ var scopeuuid string
+ ele.Get("id", &scopeuuid)
+ delete(tenantCache, scopeuuid)
+ log.Debugf("Refreshed local"+
+ " tenantCache. Deleted"+
+ " scope: %s", scopeuuid)
+ }
+ }
+ case "kms.developer", "kms.app", "kms.api_product",
+ "kms.app_credential_apiproduct_mapper":
+ // any change in any of the above tables
+ // should result in cache refresh
+ refreshDevInfoNeeded = true
+ }
+ }
+ // Refresh cache once for all set of changes
+ if refreshDevInfoNeeded {
+ createDeveloperInfoCache()
+ log.Debug("Refresh local developerInfoCache")
+ }
+ }
+}
diff --git a/pluginData.go b/pluginData.go
new file mode 100644
index 0000000..af0faa6
--- /dev/null
+++ b/pluginData.go
@@ -0,0 +1,11 @@
+package apidAnalytics
+
+import "github.com/30x/apid"
+
+var pluginData = apid.PluginData{
+ Name: "apidAnalytics",
+ Version: "0.0.1",
+ ExtraData: map[string]interface{}{
+ "schemaVersion": "0.0.1",
+ },
+}
diff --git a/upload_manager.go b/upload_manager.go
new file mode 100644
index 0000000..a95ce87
--- /dev/null
+++ b/upload_manager.go
@@ -0,0 +1,111 @@
+package apidAnalytics
+
+import (
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "time"
+)
+
+const (
+ maxRetries = 3
+ retryFailedDirBatchSize = 10
+)
+
+// Each file upload is retried maxRetries times before
+// moving it to failed directory
+var retriesMap map[string]int
+
+//TODO: make sure that this instance gets initialized only once
+// since we dont want multiple upload manager tickers running
+func initUploadManager() {
+
+ retriesMap = make(map[string]int)
+
+ go func() {
+ // Periodically check the staging directory to check
+ // if any folders are ready to be uploaded to S3
+ ticker := time.NewTicker(time.Second *
+ config.GetDuration(analyticsUploadInterval))
+ log.Debugf("Intialized upload manager to check for staging directory")
+ // Ticker will keep running till go routine is running
+ // i.e. till application is running
+ defer ticker.Stop()
+
+ for range ticker.C {
+ files, err := ioutil.ReadDir(localAnalyticsStagingDir)
+
+ if err != nil {
+ log.Errorf("Cannot read directory: "+
+ "%s", localAnalyticsStagingDir)
+ }
+
+ uploadedDirCnt := 0
+ for _, file := range files {
+ if file.IsDir() {
+ status := uploadDir(file)
+ handleUploadDirStatus(file, status)
+ if status {
+ uploadedDirCnt++
+ }
+ }
+ }
+ if uploadedDirCnt > 0 {
+ // After a successful upload, retry the
+ // folders in failed directory as they might have
+ // failed due to intermitent S3/GCS issue
+ retryFailedUploads()
+ }
+ }
+ }()
+}
+
+func handleUploadDirStatus(dir os.FileInfo, status bool) {
+ completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ // If upload is successful then delete files
+ // and remove bucket from retry map
+ if status {
+ os.RemoveAll(completePath)
+ log.Debugf("deleted directory after "+
+ "successful upload: %s", dir.Name())
+ // remove key if exists from retry map after a successful upload
+ delete(retriesMap, dir.Name())
+ } else {
+ retriesMap[dir.Name()] = retriesMap[dir.Name()] + 1
+ if retriesMap[dir.Name()] >= maxRetries {
+ log.Errorf("Max Retires exceeded for folder: %s", completePath)
+ failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+ err := os.Rename(completePath, failedDirPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s'"+
+ " from staging to failed folder", dir.Name())
+ }
+ // remove key from retry map once it reaches allowed max failed attempts
+ delete(retriesMap, dir.Name())
+ }
+ }
+}
+
+func retryFailedUploads() {
+ failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir)
+
+ if err != nil {
+ log.Errorf("Cannot read directory: %s", localAnalyticsFailedDir)
+ }
+
+ cnt := 0
+ for _, dir := range failedDirs {
+ // We rety failed folder in batches to not overload the upload thread
+ if cnt < retryFailedDirBatchSize {
+ failedPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
+ newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ err := os.Rename(failedPath, newStagingPath)
+ if err != nil {
+ log.Errorf("Cannot move directory '%s'"+
+ " from failed to staging folder", dir.Name())
+ }
+ } else {
+ break
+ }
+ }
+}
diff --git a/uploader.go b/uploader.go
new file mode 100644
index 0000000..fb8a267
--- /dev/null
+++ b/uploader.go
@@ -0,0 +1,155 @@
+package apidAnalytics
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+)
+
+const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
+
+var token string
+
+var client *http.Client = &http.Client{
+ //set default timeout of 60 seconds while connecting to s3/GCS
+ Timeout: time.Duration(60 * time.Second),
+}
+
+func addHeaders(req *http.Request) {
+ token = config.GetString("apigeesync_bearer_token")
+ req.Header.Add("Authorization", "Bearer "+token)
+}
+
+func uploadDir(dir os.FileInfo) bool {
+ // Eg. org~env~20160101224500
+ tenant, timestamp := splitDirName(dir.Name())
+ //date=2016-01-01/time=22-45
+ dateTimePartition := getDateFromDirTimestamp(timestamp)
+
+ completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ files, _ := ioutil.ReadDir(completePath)
+
+ status := true
+ var error error
+ for _, file := range files {
+ completeFilePath := filepath.Join(completePath, file.Name())
+ relativeFilePath := dateTimePartition + "/" + file.Name()
+ status, error = uploadFile(tenant, relativeFilePath, completeFilePath)
+ if error != nil {
+ log.Errorf("Upload failed due to: %v", error)
+ break
+ } else {
+ os.Remove(completeFilePath)
+ log.Debugf("Deleted file '%s' after "+
+ "successful upload", file.Name())
+ }
+ }
+ return status
+}
+
+func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
+ signedUrl, err := getSignedUrl(tenant, relativeFilePath)
+ if err != nil {
+ return false, err
+ } else {
+ return uploadFileToDatastore(completeFilePath, signedUrl)
+ }
+}
+
+func getSignedUrl(tenant, relativeFilePath string) (string, error) {
+ uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
+
+ req, err := http.NewRequest("GET", uapCollectionUrl, nil)
+ if err != nil {
+ return "", err
+ }
+
+ q := req.URL.Query()
+
+ // eg. edgexfeb1~test
+ q.Add("tenant", tenant)
+ // eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz
+ q.Add("relative_file_path", relativeFilePath)
+ q.Add("file_content_type", "application/x-gzip")
+ req.URL.RawQuery = q.Encode()
+
+ // Add Bearer Token to each request
+ addHeaders(req)
+ resp, err := client.Do(req)
+ if err != nil {
+ return "", err
+ }
+ defer resp.Body.Close()
+
+ respBody, _ := ioutil.ReadAll(resp.Body)
+ if resp.StatusCode == 200 {
+ var body map[string]interface{}
+ json.Unmarshal(respBody, &body)
+ signedURL := body["url"]
+ return signedURL.(string), nil
+ } else {
+ return "", fmt.Errorf("Error while getting "+
+ "signed URL '%v'", resp.Status)
+ }
+}
+
+func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
+ // open gzip file that needs to be uploaded
+ file, err := os.Open(completeFilePath)
+ if err != nil {
+ return false, err
+ }
+ defer file.Close()
+
+ req, err := http.NewRequest("PUT", signedUrl, file)
+ if err != nil {
+ return false, fmt.Errorf("Parsing URL failed '%v'", err)
+ }
+
+ req.Header.Set("Expect", "100-continue")
+ req.Header.Set("Content-Type", "application/x-gzip")
+
+ fileStats, err := file.Stat()
+ if err != nil {
+ return false, fmt.Errorf("Could not get content length for "+
+ "file '%v'", err)
+ }
+ req.ContentLength = fileStats.Size()
+
+ resp, err := client.Do(req)
+ if err != nil {
+ return false, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode == 200 {
+ return true, nil
+ } else {
+ return false, fmt.Errorf("Final Datastore (S3/GCS)returned "+
+ "Error '%v'", resp.Status)
+ }
+}
+
+// Extract tenant and timestamp from directory Name
+func splitDirName(dirName string) (string, string) {
+ s := strings.Split(dirName, "~")
+ tenant := s[0] + "~" + s[1]
+ timestamp := s[2]
+ return tenant, timestamp
+}
+
+// files are uploaded to S3 under specific date time partition and that
+// key needs to be generated from the plugin
+// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
+func getDateFromDirTimestamp(timestamp string) string {
+ dateTime, _ := time.Parse(timestampLayout, timestamp)
+ date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd
+ time := dateTime.Format("15-04") // same as HH-mm
+ dateHourTS := "date=" + date + "/time=" + time
+ return dateHourTS
+}