[XAPID-377] Added caching for developer and datascope info and made use_caching a config
diff --git a/api.go b/api.go index a1710cd..a34018d 100644 --- a/api.go +++ b/api.go
@@ -21,6 +21,7 @@ type tenant struct { org string env string + tenantId string } func initAPI(services apid.Services) { @@ -33,7 +34,13 @@ w.Header().Set("Content-Type", "application/json; charset=UTF-8") - db, _ := data.DB() + db, _ := data.DB() // When database isnt initialized + if db == nil { + writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") + return + } + + db = getDB() // When snapshot isnt processed if db == nil { writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") return
diff --git a/api_helper.go b/api_helper.go new file mode 100644 index 0000000..7ae0b8f --- /dev/null +++ b/api_helper.go
@@ -0,0 +1,135 @@ +package apidAnalytics + +import ( + "encoding/json" + "net/http" + "io" + "io/ioutil" + "strings" + "compress/gzip" +) + + +type developerInfo struct { + apiProduct string + developerApp string + developerEmail string + developer string +} + +func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { + var gzipEncoded bool + if r.Header.Get("Content-Encoding") != "" { + if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") { + return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"} + } else { + gzipEncoded = true + } + } + + var reader io.ReadCloser + var err error + if gzipEncoded { + reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data + if err != nil { + return errResponse{"BAD_DATA", "Gzip data cannot be read"} + } + } else { + reader = r.Body + } + + body, _ := ioutil.ReadAll(reader) + errMessage := validateEnrichPublish(tenant, scopeuuid, body) + if errMessage.ErrorCode != "" { + return errMessage + } + return errResponse{} +} + +func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse { + var raw map[string]interface{} + json.Unmarshal(body, &raw) + if records := raw["records"]; records != nil { + for _, eachRecord := range records.([]interface{}) { + recordMap := eachRecord.(map[string]interface{}) + valid, err := validate(recordMap) + if valid { + enrich(recordMap, scopeuuid, tenant) + log.Debugf("Raw records : %v ", eachRecord) + } else { + return err // Even if there is one bad record, then reject entire batch + } + } + publishToChannel(records.([]interface{})) + } else { + return errResponse{"NO_RECORDS", "No analytics records in the payload"} + } + return errResponse{} +} + +func validate(recordMap map[string]interface{}) (bool, errResponse) { + elems := []string{"client_received_start_timestamp"} + for _, elem := range elems { + if recordMap[elem] == nil { + return false, errResponse{"MISSING_FIELD", "Missing field: " + elem} + } + } + + crst, exists1 := recordMap["client_received_start_timestamp"] + cret, exists2 := recordMap["client_received_end_timestamp"] + if exists1 && exists2 { + if crst.(int64) > cret.(int64) { + return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} + + } + } + return true, errResponse{} +} + +func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) { + if recordMap["organization"] == "" { + recordMap["organization"] = tenant.org + + } + if recordMap["environment"] == "" { + recordMap["environment"] = tenant.env + } + apiKey, exists := recordMap["client_id"] + // apiKey doesnt exist then ignore adding developer fields + if exists { + apiKey := apiKey.(string) + devInfo := getDeveloperInfo(tenant.tenantId, apiKey) + log.Debugf("developerInfo = %v", devInfo) + if recordMap["api_product"] == "" { + recordMap["api_product"] = devInfo.apiProduct + } + if recordMap["developer_app"] == "" { + recordMap["developer_app"] = devInfo.developerApp + } + if recordMap["developer_email"] == "" { + recordMap["developer_email"] = devInfo.developerEmail + } + if recordMap["developer"] == "" { + recordMap["developer"] = devInfo.developer + } + } +} + +func publishToChannel(records []interface{}) { + // TODO: add the batch of records to a channel for consumption + return +} + +func writeError(w http.ResponseWriter, status int, code string, reason string) { + w.WriteHeader(status) + e := errResponse{ + ErrorCode: code, + Reason: reason, + } + bytes, err := json.Marshal(e) + if err != nil { + log.Errorf("unable to marshal errorResponse: %v", err) + } else { + w.Write(bytes) + } +}
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go index 314defb..da8d20d 100644 --- a/apidAnalytics_suite_test.go +++ b/apidAnalytics_suite_test.go
@@ -10,15 +10,12 @@ "net/http" "net/http/httptest" "os" - "sync" "testing" ) var ( testTempDir string testServer *httptest.Server - unsafeDB apid.DB - dbMux sync.RWMutex ) func TestApidAnalytics(t *testing.T) { @@ -36,15 +33,16 @@ Expect(err).NotTo(HaveOccurred()) config.Set("data_path", testTempDir) - config.Set(uapEndpoint, "http://localhost:9000") // dummy value - - apid.InitializePlugins() + config.Set(uapServerBase, "http://localhost:9000") // dummy value db, err := apid.Data().DB() Expect(err).NotTo(HaveOccurred()) setDB(db) createApidClusterTables(db) + createTables(db) insertTestData(db) + apid.InitializePlugins() + testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == analyticsBasePathDefault { saveAnalyticsRecord(w, req) @@ -52,45 +50,112 @@ })) }) -func setDB(db apid.DB) { - dbMux.Lock() - unsafeDB = db - dbMux.Unlock() +func createTables(db apid.DB) { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS api_product ( + id text, + tenant_id text, + name text, + display_name text, + description text, + api_resources text[], + approval_type text, + _change_selector text, + proxies text[], + environments text[], + quota text, + quota_time_unit text, + quota_interval int, + created_at int64, + created_by text, + updated_at int64, + updated_by text, + PRIMARY KEY (tenant_id, id)); + CREATE TABLE IF NOT EXISTS developer ( + id text, + tenant_id text, + username text, + first_name text, + last_name text, + password text, + email text, + status text, + encrypted_password text, + salt text, + _change_selector text, + created_at int64, + created_by text, + updated_at int64, + updated_by text, + PRIMARY KEY (tenant_id, id) + ); + CREATE TABLE IF NOT EXISTS app ( + id text, + tenant_id text, + name text, + display_name text, + access_type text, + callback_url text, + status text, + app_family text, + company_id text, + developer_id text, + type int, + created_at int64, + created_by text, + updated_at int64, + updated_by text, + _change_selector text, + PRIMARY KEY (tenant_id, id) + ); + CREATE TABLE IF NOT EXISTS app_credential_apiproduct_mapper ( + tenant_id text, + appcred_id text, + app_id text, + apiprdt_id text, + _change_selector text, + status text, + PRIMARY KEY (appcred_id, app_id, apiprdt_id,tenant_id) + ); + `) + if err != nil { + panic("Unable to initialize DB " + err.Error()) + } } func createApidClusterTables(db apid.DB) { _, err := db.Exec(` -CREATE TABLE apid_cluster ( - id text, - instance_id text, - name text, - description text, - umbrella_org_app_name text, - created int64, - created_by text, - updated int64, - updated_by text, - _change_selector text, - snapshotInfo text, - lastSequence text, - PRIMARY KEY (id) -); -CREATE TABLE data_scope ( - id text, - apid_cluster_id text, - scope text, - org text, - env text, - created int64, - created_by text, - updated int64, - updated_by text, - _change_selector text, - PRIMARY KEY (id) -); -`) + CREATE TABLE apid_cluster ( + id text, + instance_id text, + name text, + description text, + umbrella_org_app_name text, + created int64, + created_by text, + updated int64, + updated_by text, + _change_selector text, + snapshotInfo text, + lastSequence text, + PRIMARY KEY (id) + ); + CREATE TABLE data_scope ( + id text, + apid_cluster_id text, + scope text, + org text, + env text, + created int64, + created_by text, + updated int64, + updated_by text, + _change_selector text, + PRIMARY KEY (id) + ); + `) if err != nil { - log.Panic("Unable to initialize DB", err) + panic("Unable to initialize DB " + err.Error()) } } @@ -98,22 +163,57 @@ txn, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) + txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+ + "VALUES" + + "($1,$2,$3,$4,$5,$6)", + "tenantid", + "testapikey", + "testappid", + "testproductid", + "APPROVED", + "12345", + ); + + txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+ + "VALUES" + + "($1,$2,$3,$4)", + "testappid", + "tenantid", + "testapp", + "testdeveloperid", + ); + + txn.Exec("INSERT INTO API_PRODUCT (id, tenant_id, name) "+ + "VALUES" + + "($1,$2,$3)", + "testproductid", + "tenantid", + "testproduct", + ); + + txn.Exec("INSERT INTO DEVELOPER (id, tenant_id, username, email) "+ + "VALUES" + + "($1,$2,$3,$4)", + "testdeveloperid", + "tenantid", + "testdeveloper", + "testdeveloper@test.com", + ); txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+ "VALUES"+ "($1,$2,$3,$4,$5,$6)", "testid", + "some_change_selector", "some_cluster_id", - "some_cluster_id", - "tenant_id_xxxx", + "tenantid", "testorg", "testenv", - ) + ); txn.Commit() - var count int64 - db.QueryRow("select count(*) from data_scope").Scan(&count) } + var _ = AfterSuite(func() { apid.Events().Close() if testServer != nil {
diff --git a/apidAnalytics_test.go b/apidAnalytics_test.go deleted file mode 100644 index dcbf64c..0000000 --- a/apidAnalytics_test.go +++ /dev/null
@@ -1,25 +0,0 @@ -package apidAnalytics - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("test getTenantForScope()", func() { - Context("get tenant for valid scopeuuid", func() { - It("should return testorg and testenv", func() { - tenant, dbError := getTenantForScope("testid") - Expect(dbError.Reason).To(Equal("")) - Expect(tenant.org).To(Equal("testorg")) - Expect(tenant.env).To(Equal("testenv")) - }) - }) - - Context("get tenant for invalid scopeuuid", func() { - It("should return empty tenant and a db error", func() { - tenant, dbError := getTenantForScope("wrongid") - Expect(tenant.org).To(Equal("")) - Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) - }) - }) -}) \ No newline at end of file
diff --git a/common_helper.go b/common_helper.go new file mode 100644 index 0000000..6d320dc --- /dev/null +++ b/common_helper.go
@@ -0,0 +1,155 @@ +package apidAnalytics + +import ( + "database/sql" + "fmt" +) + +var tenantCache map[string]tenant +var developerInfoCache map[string]developerInfo + +func createTenantCache() error { + tenantCache = make(map[string]tenant) + var org, env, tenantId, id string + db, err := data.DB() + if err != nil { + return fmt.Errorf("DB not initalized") + } + + rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE") + + if error != nil { + return fmt.Errorf("Count not get datascope from DB due to : %s", error.Error()) + } else { + defer rows.Close() + for rows.Next() { + rows.Scan(&env, &org, &tenantId, &id); + tenantCache[id] = tenant{org: org, env: env, tenantId: tenantId} + } + } + log.Debugf("Found scopes : %d", len(tenantCache)) + return nil +} + +func createDeveloperInfoCache() error { + developerInfoCache = make(map[string]developerInfo) + + var apiProduct, developerApp, developerEmail, developer sql.NullString + var tenantId, apiKey string + + db := getDB() + + sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name, a.name, d.username, d.email " + + "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " + + "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + + "INNER JOIN APP AS a ON a.id = mp.app_id " + + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id ;" + rows, error := db.Query(sSql) + + if error != nil { + return fmt.Errorf("Count not get developerInfo from DB due to : %s", error.Error()) + } else { + defer rows.Close() + for rows.Next() { + rows.Scan(&tenantId,&apiKey,&apiProduct, &developerApp, &developer, &developerEmail) + + keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) + apiPrd := getValuesIgnoringNull(apiProduct) + devApp := getValuesIgnoringNull(developerApp) + dev := getValuesIgnoringNull(developer) + devEmail := getValuesIgnoringNull(developerEmail) + + developerInfoCache[keyForMap] = developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev} + } + } + return nil +} + +func getTenantForScope(scopeuuid string) (tenant, dbError) { + + if (config.GetBool(useCaching)) { + _, exists := tenantCache[scopeuuid] + if !exists { + reason := "No tenant found for this scopeuuid: " + scopeuuid + errorCode := "UNKNOWN_SCOPE" + return tenant{}, dbError{errorCode, reason} + } else { + return tenantCache[scopeuuid], dbError{} + } + } else { + var org, env, tenantId string + db, err := data.DB() + if err != nil { + reason := "DB not initialized" + errorCode := "INTERNAL_SEARCH_ERROR" + return tenant{}, dbError{errorCode, reason} + } + + error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE where id = ?", scopeuuid).Scan(&env, &org, &tenantId) + + switch { + case error == sql.ErrNoRows: + reason := "No tenant found for this scopeuuid: " + scopeuuid + errorCode := "UNKNOWN_SCOPE" + return tenant{}, dbError{errorCode, reason} + case error != nil: + reason := error.Error() + errorCode := "INTERNAL_SEARCH_ERROR" + return tenant{}, dbError{errorCode, reason} + } + + return tenant{org: org, env:env, tenantId: tenantId}, dbError{} + } +} + +func getDeveloperInfo(tenantId string, apiKey string) developerInfo { + if (config.GetBool(useCaching)) { + keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) + _, exists := developerInfoCache[keyForMap] + if !exists { + log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) + return developerInfo{} + } else { + return developerInfoCache[keyForMap] + } + } else { + var apiProduct, developerApp, developerEmail, developer sql.NullString + + db := getDB() + sSql := "SELECT ap.name, a.name, d.username, d.email " + + "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " + + "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + + "INNER JOIN APP AS a ON a.id = mp.app_id " + + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " + + "where mp.tenant_id = " + tenantId + " and mp.appcred_id = " + apiKey + ";" + error := db.QueryRow(sSql).Scan(&apiProduct, &developerApp, &developer, &developerEmail) + + switch { + case error == sql.ErrNoRows: + log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey) + return developerInfo{} + case error != nil: + log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey + " due to " + error.Error()) + return developerInfo{} + } + + apiPrd := getValuesIgnoringNull(apiProduct) + devApp := getValuesIgnoringNull(developerApp) + dev := getValuesIgnoringNull(developer) + devEmail := getValuesIgnoringNull(developerEmail) + + return developerInfo{apiProduct: apiPrd, developerApp: devApp, developerEmail: devEmail, developer: dev} + } +} + +func getValuesIgnoringNull(sqlValue sql.NullString) string { + if sqlValue.Valid { + return sqlValue.String + } else { + return "" + } +} + +func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string { + return tenantId + "~" + apiKey +}
diff --git a/common_helper_test.go b/common_helper_test.go new file mode 100644 index 0000000..46f1b6c --- /dev/null +++ b/common_helper_test.go
@@ -0,0 +1,50 @@ +package apidAnalytics + + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("test getTenantForScope()", func() { + Context("get tenant for valid scopeuuid", func() { + It("should return testorg and testenv", func() { + tenant, dbError := getTenantForScope("testid") + Expect(dbError.Reason).To(Equal("")) + Expect(tenant.org).To(Equal("testorg")) + Expect(tenant.env).To(Equal("testenv")) + Expect(tenant.tenantId).To(Equal("tenantid")) + }) + }) + + Context("get tenant for invalid scopeuuid", func() { + It("should return empty tenant and a db error", func() { + tenant, dbError := getTenantForScope("wrongid") + Expect(tenant.org).To(Equal("")) + Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) + }) + }) +}) + +var _ = Describe("test getDeveloperInfo()", func() { + Context("get developerInfo for valid tenantId and apikey", func() { + It("should return all right data", func() { + developerInfo := getDeveloperInfo("tenantid","testapikey") + Expect(developerInfo.apiProduct).To(Equal("testproduct")) + Expect(developerInfo.developer).To(Equal("testdeveloper")) + Expect(developerInfo.developerEmail).To(Equal("testdeveloper@test.com")) + Expect(developerInfo.developerApp).To(Equal("testapp")) + }) + }) + + Context("get developerInfo for invalid tenantId and apikey", func() { + It("should return all right data", func() { + developerInfo := getDeveloperInfo("wrongid","wrongapikey") + Expect(developerInfo.apiProduct).To(Equal("")) + Expect(developerInfo.developer).To(Equal("")) + Expect(developerInfo.developerEmail).To(Equal("")) + Expect(developerInfo.developerApp).To(Equal("")) + + }) + }) +}) \ No newline at end of file
diff --git a/helper.go b/helper.go deleted file mode 100644 index b092db1..0000000 --- a/helper.go +++ /dev/null
@@ -1,173 +0,0 @@ -package apidAnalytics - -import ( - "database/sql" - "encoding/json" - "github.com/30x/apid" - "net/http" - "io" - "io/ioutil" - "strings" - "compress/gzip" -) - -type developerInfo struct { - apiProduct string - developerApp string - developerEmail string - developer string -} - -func getTenantForScope(scopeuuid string) (tenant, dbError) { - // TODO: create a cache during init and refresh it on every failure or listen for snapshot update event - var org, env string - { - db, err := apid.Data().DB() - switch { - case err != nil: - reason := err.Error() - errorCode := "INTERNAL_SEARCH_ERROR" - return tenant{org, env}, dbError{errorCode, reason} - } - - error := db.QueryRow("SELECT env, org FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&env, &org) - - switch { - case error == sql.ErrNoRows: - reason := "No tenant found for this scopeuuid: " + scopeuuid - errorCode := "UNKNOWN_SCOPE" - return tenant{org, env}, dbError{errorCode, reason} - case error != nil: - reason := error.Error() - errorCode := "INTERNAL_SEARCH_ERROR" - return tenant{org, env}, dbError{errorCode, reason} - } - } - return tenant{org, env}, dbError{} -} - - -func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { - var gzipEncoded bool - if r.Header.Get("Content-Encoding") != "" { - if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") { - return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"} - } else { - gzipEncoded = true - } - } - - var reader io.ReadCloser - var err error - if gzipEncoded { - reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data - if err != nil { - return errResponse{"BAD_DATA", "Gzip data cannot be read"} - } - } else { - reader = r.Body - } - - body, _ := ioutil.ReadAll(reader) - errMessage := validateAndEnrich(tenant, scopeuuid, body) - if errMessage.ErrorCode != "" { - return errMessage - } - return errResponse{} -} - -func validateAndEnrich(tenant tenant, scopeuuid string, body []byte) errResponse { - var raw map[string]interface{} - json.Unmarshal(body, &raw) - if records := raw["records"]; records != nil { - for _, eachRecord := range records.([]interface{}) { - recordMap := eachRecord.(map[string]interface{}) - valid, err := validate(recordMap) - if valid { - enrich(recordMap, scopeuuid, tenant) - log.Debugf("Raw records : %v ", eachRecord) - } else { - return err // Even if there is one bad record, then reject entire batch - } - } - // TODO: add the batch of records to a channel for consumption - } else { - return errResponse{"NO_RECORDS", "No analytics records in the payload"} - } - return errResponse{} -} - -func validate(recordMap map[string]interface{}) (bool, errResponse) { - elems := []string{"client_received_start_timestamp"} - for _, elem := range elems { - if recordMap[elem] == nil { - return false, errResponse{"MISSING_FIELD", "Missing field: " + elem} - } - } - - crst, exists1 := recordMap["client_received_start_timestamp"] - cret, exists2 := recordMap["client_received_end_timestamp"] - if exists1 && exists2 { - if crst.(int64) > cret.(int64) { - return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} - - } - } - // api key is required to find other info - _, exists3 := recordMap["client_id"] - if !exists3 { - return false, errResponse{"BAD_DATA", "client_id cannot be null"} - } - return true, errResponse{} -} - -func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) { - recordMap["organization"] = tenant.org - recordMap["environment"] = tenant.env - apiKey := recordMap["client_id"].(string) - devInfo := getDeveloperInfo(scopeuuid, apiKey) - recordMap["api_product"] = devInfo.apiProduct - recordMap["developer_app"] = devInfo.developerApp - recordMap["developer_email"] = devInfo.developerEmail - recordMap["developer"] = devInfo.developer -} - -// if info not found then dont set it -func getDeveloperInfo(scopeuuid string, apiKey string) developerInfo { - // TODO: create a cache during init and refresh it on update event - var apiProduct, developerApp, developerEmail, developer string - { - db, err := apid.Data().DB() - switch { - case err != nil: - return developerInfo{} - } - - // TODO: query needs to change (wont work, it is just a placeholder) - error := db.QueryRow("SELECT apiProduct, developerApp, developerEmail, developer FROM DATA_SCOPE WHERE id = ?;", scopeuuid).Scan(&apiProduct, &developerApp, &developerEmail, &developer) - - switch { - case error == sql.ErrNoRows: - return developerInfo{} - case error != nil: - return developerInfo{} - } - } - return developerInfo{apiProduct, developerApp, developerEmail, developer} - // For local testing - //return developerInfo{"test_product", "test_app", "test@test.com", "test"} -} - -func writeError(w http.ResponseWriter, status int, code string, reason string) { - w.WriteHeader(status) - e := errResponse{ - ErrorCode: code, - Reason: reason, - } - bytes, err := json.Marshal(e) - if err != nil { - log.Errorf("unable to marshal errorResponse: %v", err) - } else { - w.Write(bytes) - } -}
diff --git a/init.go b/init.go index 9f1d284..5b40590 100644 --- a/init.go +++ b/init.go
@@ -4,6 +4,7 @@ "fmt" "github.com/30x/apid" "os" + "sync" "path/filepath" ) @@ -24,13 +25,10 @@ analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds analyticsUploadIntervalDefault = "5" - uapEndpoint = "apidanalytics_uap_endpoint" // config + uapServerBase = "apidanalytics_uap_server_base" // config - uapRepo = "apidanalytics_uap_repo" // config - uapRepoDefault = "edge" - - uapDataset = "apidanalytics_uap_dataset" // config - uapDatasetDefault = "api" + useCaching = "apidanalytics_use_caching" + useCachingDefault = true maxRetries = 3 ) @@ -41,12 +39,16 @@ log apid.LogService config apid.ConfigService data apid.DataService + events apid.EventsService + unsafeDB apid.DB + dbMux sync.RWMutex localAnalyticsBaseDir string localAnalyticsTempDir string localAnalyticsStagingDir string localAnalyticsFailedDir string localAnalyticsRecoveredDir string + uapEndpoint string ) // apid.RegisterPlugin() is required to be called in init() @@ -54,6 +56,19 @@ apid.RegisterPlugin(initPlugin) } +func getDB() apid.DB { + dbMux.RLock() + db := unsafeDB + dbMux.RUnlock() + return db +} + +func setDB(db apid.DB) { + dbMux.Lock() + unsafeDB = db + dbMux.Unlock() +} + // initPlugin will be called by apid to initialize func initPlugin(services apid.Services) (apid.PluginData, error) { @@ -67,11 +82,12 @@ return pluginData, fmt.Errorf("Missing required config value: %s: ", err) } - for _, key := range []string{uapEndpoint} { + for _, key := range []string{uapServerBase} { if !config.IsSet(key) { return pluginData, fmt.Errorf("Missing required config value: %s", key) } } + uapEndpoint = uapServerBase + "/analytics" directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} err = createDirectories(directories) @@ -81,9 +97,28 @@ } data = services.Data() + events = services.Events() + events.Listen("ApigeeSync", &handler{}) // TODO: perform crash recovery initUploadManager() + + if (config.GetBool(useCaching)) { + err = createTenantCache() + if err != nil { + return pluginData, fmt.Errorf("Could not create tenant cache %s: ", err) + } + log.Debug("Created a local cache for datasope information") + + err = createDeveloperInfoCache() + if err != nil { + return pluginData, fmt.Errorf("Could not creata developer info cache %s: ", err) + } + log.Debug("Created a local cache for developer and app information") + } else { + log.Debug("Will not be caching any info and make a DB call for every analytics msg") + } + initAPI(services) log.Debug("end init for apidAnalytics plugin") @@ -112,13 +147,12 @@ config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault) config.SetDefault(analyticsCollectionNoFiles, analyticsCollectionNoFilesDefault) + // set default config for local caching + config.SetDefault(useCaching, useCachingDefault) + // set default config for upload interval config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault) - // set defaults for uap related properties - config.SetDefault(uapRepo, uapRepoDefault) - config.SetDefault(uapDataset, uapDatasetDefault) - return nil }
diff --git a/listener.go b/listener.go new file mode 100644 index 0000000..251fe5e --- /dev/null +++ b/listener.go
@@ -0,0 +1,75 @@ +package apidAnalytics +import ( + "github.com/30x/apid" + "github.com/apigee-labs/transicator/common" +) + +type handler struct { +} + +func (h *handler) String() string { + return "apidAnalytics" +} + +func (h *handler) Handle(e apid.Event) { + + snapData, ok := e.(*common.Snapshot) + if ok { + processSnapshot(snapData) + } else { + changeSet, ok := e.(*common.ChangeList) + if ok { + processChange(changeSet) + } else { + log.Errorf("Received Invalid event. Ignoring. %v", e) + } + } + return +} + +func processSnapshot(snapshot *common.Snapshot) { + log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) + + db, err := data.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } + setDB(db) + return +} + +func processChange(changes *common.ChangeList) { + + log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) + var rows []common.Row + + for _, payload := range changes.Changes { + rows = nil + switch payload.Table { + case "edgex.data_scope": + switch payload.Operation { + case common.Insert, common.Update: + rows = append(rows, payload.NewRow) + for _, ele := range rows { + var scopeuuid, tenantid, org, env string + ele.Get("id", &scopeuuid) + ele.Get("scope", &tenantid) + ele.Get("org", &org) + ele.Get("env", &env) + tenantCache[scopeuuid] = tenant{org: org, env: env, tenantId: tenantid} + } + case common.Delete: + rows = append(rows, payload.NewRow) + for _, ele := range rows { + var scopeuuid string + ele.Get("id", &scopeuuid) + delete(tenantCache, scopeuuid) + } + } + case "kms.developer", "kms.app", "kms.api_product", "kms.app_credential_apiproduct_mapper": + // any change in any of the above tables should result in cache refresh + createDeveloperInfoCache() + log.Debug("refreshed local developerInfoCache") + } + } +}
diff --git a/swagger.yaml b/swagger.yaml new file mode 100644 index 0000000..c877b84 --- /dev/null +++ b/swagger.yaml
@@ -0,0 +1,126 @@ +swagger: "2.0" +info: + version: "v1" + title: Swagger API +host: playground.apistudio.io +basePath: /try/64e409ad-aebb-4bbc-977e-f0e0f22209d4 +schemes: + - http + - https +consumes: + - application/json +produces: + - application/json +paths: + '/analytics/{bundle_scope_uuid}': + x-swagger-router-controller: analytics + parameters: + - name: bundle_scope_uuid + in: path + required: true + description: bundle UUID that can be mapped to a scope by APID + type: string + - name: analytics_data + in: body + description: The analytics data you want to post + required: true + schema: + $ref: "#/definitions/records" + post: + responses: + "200": + description: Success + default: + description: Error + schema: + $ref: "#/definitions/errResponse" + +definitions: + records: + type: array + minItems: 1 + items: + $ref: "#/definitions/eachRecord" + + eachRecord: + type: object + required: + - access_token + - apiproxy + - apiproxy_revision + - client_id + - client_ip + - client_received_end_timestamp + - client_received_start_timestamp + - client_sent_end_timestamp + - client_sent_start_timestamp + - request_path + - request_uri + - request_verb + - response_status_code + - useragent + - target + - target_received_end_timestamp + - target_received_start_timestamp + - target_response_code + - target_sent_end_timestamp + - target_sent_start_timestamp + properties: + access_token: + type: string + apiproxy: + type: string + apiproxy_revision: + type: string + client_id: + type: string + client_ip: + type: string + client_received_end_timestamp: + type: integer + format: int64 + client_received_start_timestamp: + type: integer + format: int64 + client_sent_end_timestamp: + type: integer + format: int64 + client_sent_start_timestamp: + type: integer + format: int64 + request_path: + type: string + request_uri: + type: string + request_verb: + type: string + response_status_code: + type: integer + useragent: + type: string + target: + type: string + target_received_end_timestamp: + type: integer + format: int64 + target_received_start_timestamp: + type: integer + format: int64 + target_response_code: + type: integer + target_sent_end_timestamp: + type: integer + format: int64 + target_sent_start_timestamp: + type: integer + format: int64 + + errResponse: + required: + - errrorCode + - reason + properties: + errrorCode: + type: string + reason: + type: string \ No newline at end of file
diff --git a/uploadManager.go b/uploadManager.go index 43290d2..f6f0f87 100644 --- a/uploadManager.go +++ b/uploadManager.go
@@ -38,12 +38,6 @@ } } }() - -} - -func uploadDir(file os.FileInfo) bool { - // TODO: handle upload to UAP file by file - return false } func handleUploadDirStatus(file os.FileInfo, status bool) { @@ -65,4 +59,4 @@ delete(retriesMap, file.Name()) } } -} +} \ No newline at end of file
diff --git a/uploader.go b/uploader.go new file mode 100644 index 0000000..308e752 --- /dev/null +++ b/uploader.go
@@ -0,0 +1,35 @@ +package apidAnalytics + +import ( + _ "fmt" + "os" + "strings" + "path/filepath" +) + +func uploadDir(dir os.FileInfo) bool { + // TODO: handle upload to UAP file by file + completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) + log.Debug("Complete Path : %s", completePath) + tenant, timestamp := splitDirName(dir.Name()) + date := getDateFromDirTimestamp(timestamp) + log.Debug("tenant: %s | timestamp %s", tenant, date) + //for _, file := range dir { + // //log.Debugf("t: %s , file: %s", t, file.Name()) + // if file.IsDir() { + // handleUploadDirStatus(file, uploadDir(file)) + // } + //} + return false +} + +func splitDirName(dirName string) (string, string){ + s := strings.Split("dirName", "~") + tenant := s[0]+"~"+s[1] + timestamp := s[2] + return tenant, timestamp +} + +func getDateFromDirTimestamp(timestamp string) (string){ + return "" +} \ No newline at end of file