Removed all code related to kms related enrichment
diff --git a/README.md b/README.md index 39fb159..da829f6 100644 --- a/README.md +++ b/README.md
@@ -23,14 +23,11 @@ 2. Create a listener for Apigee-Sync event 1. Each time a Snapshot is received, create an in-memory cache for data scope 2. Each time a changeList is received, if data_scope info changed, then insert/delete info for changed scope from tenantCache - 3. Developer info cache will be invalidated periodically and populated when 1st request for that apiKey comes in 3. Initialize POST /analytics/{scope_uuid} and POST /analytics API's 4. Upon receiving requests 1. Validate and enrich each batch of analytics records. If scope_uuid is given, then that is used to validate. If scope_uuid is not provided, then the payload should have organization and environment. The org/env is then used to validate the scope for this cluster. - 1. If developerCache does not have info for apiKey then get from DB and insert into cache. - This way the cache will only have info for developers/app with active traffic 2. If valid, then publish records to an internal buffer channel 5. Buffering Logic 1. Buffering manager creates listener on the internal buffer channel and thus consumes messages
diff --git a/api.go b/api.go index b84fdac..b93d327 100644 --- a/api.go +++ b/api.go
@@ -106,7 +106,7 @@ if err.ErrorCode == "" { tenant, e := getTenantFromPayload(body) if e.ErrorCode == "" { - _, dbErr := validateTenant(&tenant) + _, dbErr := validateTenant(tenant) if dbErr.ErrorCode != "" { switch dbErr.ErrorCode { case "INTERNAL_SEARCH_ERROR":
diff --git a/api_helper.go b/api_helper.go index 5e49016..67e9725 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -41,9 +41,8 @@ } type tenant struct { - Org string - Env string - TenantId string + Org string + Env string } func getJsonBody(r *http.Request) (map[string]interface{}, errResponse) { @@ -186,37 +185,11 @@ /* Enrich each record by adding org and env fields -It also finds and adds developer related information based on the apiKey if not already present in the payload */ func enrich(recordMap map[string]interface{}, tenant tenant) { // Always overwrite organization/environment value with the tenant information provided in the payload recordMap["organization"] = tenant.Org recordMap["environment"] = tenant.Env - - apiKey, exists := recordMap["client_id"] - // apiKey doesnt exist then ignore adding developer fields - if exists && apiKey != nil { - apiKey, isString := apiKey.(string) - if isString { - devInfo := getDeveloperInfo(tenant.TenantId, apiKey) - ap, exists := recordMap["api_product"] - if !exists || ap == nil { - recordMap["api_product"] = devInfo.ApiProduct - } - da, exists := recordMap["developer_app"] - if !exists || da == nil { - recordMap["developer_app"] = devInfo.DeveloperApp - } - de, exists := recordMap["developer_email"] - if !exists || de == nil { - recordMap["developer_email"] = devInfo.DeveloperEmail - } - d, exists := recordMap["developer"] - if !exists || d == nil { - recordMap["developer"] = devInfo.Developer - } - } - } } func writeError(w http.ResponseWriter, status int, code string, reason string) {
diff --git a/api_helper_test.go b/api_helper_test.go index b7f8910..4101c5c 100644 --- a/api_helper_test.go +++ b/api_helper_test.go
@@ -145,67 +145,55 @@ }) var _ = Describe("test enrich() directly", func() { - Context("enrich record for existing apiKey", func() { - It("developer related fields should be added", func() { + Context("enrich record where org/env in record is different from main org/env in payload", func() { + It("The record should also have org/env for which record was validated ", func() { var record = []byte(`{ - "response_status_code": 200, + "organization":"o", + "environment":"e", "client_id":"testapikey", "client_received_start_timestamp": 1486406248277, "client_received_end_timestamp": 1486406248290 - }`) + }`) raw := getRaw(record) - tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"} + tenant := tenant{Org: "testorg", Env: "testenv"} enrich(raw, tenant) Expect(raw["organization"]).To(Equal(tenant.Org)) Expect(raw["environment"]).To(Equal(tenant.Env)) - Expect(raw["api_product"]).To(Equal("testproduct")) - Expect(raw["developer_app"]).To(Equal("testapp")) - Expect(raw["developer_email"]).To(Equal("testdeveloper@test.com")) - Expect(raw["developer"]).To(Equal("testdeveloper")) - }) - It("developer related fields should be added only if not already existing or value is null", func() { - var record = []byte(`{ - "response_status_code": 200, - "client_id":"testapikey", - "client_received_start_timestamp": 1486406248277, - "client_received_end_timestamp": 1486406248290, - "api_product":"test_prod", - "developer_app":null - }`) - - raw := getRaw(record) - tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"} - enrich(raw, tenant) - - Expect(raw["organization"]).To(Equal(tenant.Org)) - Expect(raw["environment"]).To(Equal(tenant.Env)) - Expect(raw["api_product"]).To(Equal("test_prod")) - Expect(raw["developer_app"]).To(Equal("testapp")) - Expect(raw["developer_email"]).To(Equal("testdeveloper@test.com")) - Expect(raw["developer"]).To(Equal("testdeveloper")) }) }) - - Context("enrich record where no apikey is set", func() { + Context("enrich record where no org/env is there in the record is set", func() { It("developer related fields should not be added", func() { var record = []byte(`{ "response_status_code": 200, + "client_id":"testapikey", "client_received_start_timestamp": 1486406248277, "client_received_end_timestamp": 1486406248290 }`) - raw := getRaw(record) - tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"} + tenant := tenant{Org: "testorg", Env: "testenv"} enrich(raw, tenant) Expect(raw["organization"]).To(Equal(tenant.Org)) Expect(raw["environment"]).To(Equal(tenant.Env)) - Expect(raw["api_product"]).To(BeNil()) - Expect(raw["developer_app"]).To(BeNil()) - Expect(raw["developer_email"]).To(BeNil()) - Expect(raw["developer"]).To(BeNil()) + }) + }) + Context("enrich record where org/env is same as the main org/env in payload", func() { + It("developer related fields should not be added", func() { + var record = []byte(`{ + "organization":"testorg", + "environment": "testenv", + "client_id":"testapikey", + "client_received_start_timestamp": 1486406248277, + "client_received_end_timestamp": 1486406248290 + }`) + raw := getRaw(record) + tenant := tenant{Org: "testorg", Env: "testenv"} + enrich(raw, tenant) + + Expect(raw["organization"]).To(Equal(tenant.Org)) + Expect(raw["environment"]).To(Equal(tenant.Env)) }) }) })
diff --git a/buffering_manager_test.go b/buffering_manager_test.go index 2b2fbd6..caf50c5 100644 --- a/buffering_manager_test.go +++ b/buffering_manager_test.go
@@ -27,7 +27,7 @@ var _ = Describe("test getBucketForTimestamp()", func() { It("should return new bucket or existing bucket if created previously", func() { t := time.Date(2017, 1, 20, 10, 24, 5, 0, time.UTC) - tenant := tenant{Org: "testorg", Env: "testenv", TenantId: "tenantid"} + tenant := tenant{Org: "testorg", Env: "testenv"} bucket, err := getBucketForTimestamp(t, tenant) Expect(err).ShouldNot(HaveOccurred())
diff --git a/common_helper.go b/common_helper.go index 65aee3d..f1d34b6 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -34,13 +34,6 @@ // read while its being written to and vice versa var orgEnvCacheLock = sync.RWMutex{} -// Cache for apiKey~tenantId to developer related information -var developerInfoCache map[string]developerInfo - -// RW lock for developerInfo map cache since the cache can be -// read while its being written to and vice versa -var developerInfoCacheLock = sync.RWMutex{} - // Load data scope information into an in-memory cache so that // for each record a DB lookup is not required func createTenantCache(snapshot *common.Snapshot) { @@ -53,16 +46,14 @@ switch table.Name { case "edgex.data_scope": for _, row := range table.Rows { - var org, env, tenantId, id string + var org, env, id string row.Get("id", &id) - row.Get("scope", &tenantId) row.Get("org", &org) row.Get("env", &env) if id != "" { tenantCache[id] = tenant{Org: org, - Env: env, - TenantId: tenantId} + Env: env} } } } @@ -96,16 +87,6 @@ log.Debugf("Count of org~env in the cache: %d", len(orgEnvCache)) } -// Load data scope information into an in-memory cache so that -// for each record a DB lookup is not required -func updateDeveloperInfoCache() { - // Lock before writing to the map as it has multiple readers - developerInfoCacheLock.Lock() - defer developerInfoCacheLock.Unlock() - developerInfoCache = make(map[string]developerInfo) - log.Debug("Invalidated developerInfo cache") -} - // Returns Tenant Info given a scope uuid from the cache or by querying // the DB directly based on useCachig config func getTenantForScope(scopeuuid string) (tenant, dbError) { @@ -118,8 +99,7 @@ if !exists { log.Debugf("No tenant found for scopeuuid = %s "+ - "in cache", scopeuuid) - log.Debug("loading info from DB") + "in cache, loading info from DB", scopeuuid) // Update cache t, err := getTenantFromDB(scopeuuid) @@ -141,49 +121,13 @@ } } -// Returns Developer related info given an apiKey and tenantId -// from the cache or by querying the DB directly based on useCachig config -func getDeveloperInfo(tenantId string, apiKey string) developerInfo { - if config.GetBool(useCaching) { - keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) - // acquire a read lock as this cache has 1 writer as well - developerInfoCacheLock.RLock() - devInfo, exists := developerInfoCache[keyForMap] - developerInfoCacheLock.RUnlock() - - if !exists { - log.Debugf("No data found for for tenantId = %s"+ - " and apiKey = %s in cache", tenantId, apiKey) - log.Debug("loading info from DB") - - // Update cache - dev, err := getDevInfoFromDB(tenantId, apiKey) - - if err == nil { - // update cache - developerInfoCacheLock.Lock() - defer developerInfoCacheLock.Unlock() - key := getKeyForDeveloperInfoCache(tenantId, apiKey) - developerInfoCache[key] = dev - } - - devInfo = dev - - } - return devInfo - } else { - devInfo, _ := getDevInfoFromDB(tenantId, apiKey) - return devInfo - } -} - // Returns tenant info by querying DB directly func getTenantFromDB(scopeuuid string) (tenant, dbError) { - var org, env, tenantId string + var org, env string db := getDB() - error := db.QueryRow("SELECT env, org, scope FROM edgex_data_scope"+ - " where id = ?", scopeuuid).Scan(&env, &org, &tenantId) + error := db.QueryRow("SELECT env, org FROM edgex_data_scope"+ + " where id = ?", scopeuuid).Scan(&env, &org) switch { case error == sql.ErrNoRows: @@ -200,9 +144,8 @@ Reason: reason} } return tenant{ - Org: org, - Env: env, - TenantId: tenantId}, dbError{} + Org: org, + Env: env}, dbError{} } /* @@ -210,7 +153,7 @@ It also stores the scope i.e. tenant_id in the tenant object using pointer. tenant_id in combination with apiKey is used to find kms related information */ -func validateTenant(tenant *tenant) (bool, dbError) { +func validateTenant(tenant tenant) (bool, dbError) { if config.GetBool(useCaching) { // acquire a read lock as this cache has 1 writer as well orgEnvCacheLock.RLock() @@ -220,9 +163,7 @@ dbErr := dbError{} if !exists { log.Debugf("OrgEnv = %s not found "+ - "in cache", orgEnv) - log.Debug("loading info from DB") - + "in cache, loading info from DB", orgEnv) // Update cache valid, dbErr := validateTenantFromDB(tenant) if valid { @@ -241,79 +182,29 @@ } -func validateTenantFromDB(tenant *tenant) (bool, dbError) { +func validateTenantFromDB(tenant tenant) (bool, dbError) { db := getDB() - error := db.QueryRow("SELECT scope FROM edgex_data_scope"+ - " where org = ? and env = ?", &tenant.Org, &tenant.Env).Scan(&tenant.TenantId) - switch { - case error == sql.ErrNoRows: - reason := "No tenant found for this org: " + tenant.Org + " and env:" + tenant.Env - errorCode := "UNKNOWN_SCOPE" - return false, dbError{ - ErrorCode: errorCode, - Reason: reason} - case error != nil: - reason := error.Error() - errorCode := "INTERNAL_SEARCH_ERROR" - return false, dbError{ - ErrorCode: errorCode, - Reason: reason} + rows, err := db.Query("SELECT 1 FROM edgex_data_scope"+ + " where org = ? and env = ?", tenant.Org, tenant.Env) + + if !rows.Next() { + if err == nil { + reason := "No tenant found for this org: " + tenant.Org + " and env:" + tenant.Env + errorCode := "UNKNOWN_SCOPE" + return false, dbError{ + ErrorCode: errorCode, + Reason: reason} + } else { + reason := err.Error() + errorCode := "INTERNAL_SEARCH_ERROR" + return false, dbError{ + ErrorCode: errorCode, + Reason: reason} + } } return true, dbError{} } -// Returns developer info by querying DB directly -func getDevInfoFromDB(tenantId string, apiKey string) (developerInfo, error) { - var apiProduct, developerApp, developerEmail sql.NullString - var developer sql.NullString - - db := getDB() - sSql := "SELECT ap.name, a.name, d.username, d.email " + - "FROM kms_app_credential_apiproduct_mapper as mp " + - "INNER JOIN kms_api_product as ap ON ap.id = mp.apiprdt_id " + - "INNER JOIN kms_app AS a ON a.id = mp.app_id " + - "INNER JOIN kms_developer as d ON d.id = a.developer_id " + - "where mp.tenant_id = ? and mp.appcred_id = ?;" - error := db.QueryRow(sSql, tenantId, apiKey). - Scan(&apiProduct, &developerApp, - &developer, &developerEmail) - - switch { - case error == sql.ErrNoRows: - log.Debugf("No data found for for tenantId = %s "+ - "and apiKey = %s in DB", tenantId, apiKey) - return developerInfo{}, error - case error != nil: - log.Debugf("No data found for for tenantId = %s and "+ - "apiKey = %s due to: %v", tenantId, apiKey, error) - return developerInfo{}, error - } - - apiPrd := getValuesIgnoringNull(apiProduct) - devApp := getValuesIgnoringNull(developerApp) - dev := getValuesIgnoringNull(developer) - devEmail := getValuesIgnoringNull(developerEmail) - - return developerInfo{ApiProduct: apiPrd, - DeveloperApp: devApp, - DeveloperEmail: devEmail, - Developer: dev}, nil -} - -// Helper method to handle scanning null values in DB to empty string -func getValuesIgnoringNull(sqlValue sql.NullString) string { - if sqlValue.Valid { - return sqlValue.String - } else { - return "" - } -} - -// Build Key as a combination of tenantId and apiKey for the developerInfo Cache -func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string { - return tenantId + "~" + apiKey -} - func getKeyForOrgEnvCache(org, env string) string { return org + "~" + env }
diff --git a/common_helper_test.go b/common_helper_test.go index 42010ed..15df836 100644 --- a/common_helper_test.go +++ b/common_helper_test.go
@@ -15,7 +15,6 @@ package apidAnalytics import ( - "database/sql" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -28,7 +27,9 @@ config.Set(useCaching, true) snapshot := getDatascopeSnapshot() createTenantCache(&snapshot) + createOrgEnvCache(&snapshot) Expect(len(tenantCache)).To(Equal(1)) + Expect(len(orgEnvCache)).To(Equal(1)) }) AfterEach(func() { config.Set(useCaching, false) @@ -37,7 +38,7 @@ It("should return testorg and testenv", func() { tenant, dbError := getTenantForScope("testid") Expect(dbError.Reason).To(Equal("")) - Expect(tenant.TenantId).To(Equal("tenantid")) + Expect(tenant.Org).To(Equal("testorg")) }) }) @@ -74,7 +75,6 @@ It("should return testorg and testenv", func() { tenant, dbError := getTenantFromDB("testid") Expect(dbError.Reason).To(Equal("")) - Expect(tenant.TenantId).To(Equal("tenantid")) Expect(tenant.Org).To(Equal("testorg")) Expect(tenant.Env).To(Equal("testenv")) @@ -89,73 +89,6 @@ }) }) -var _ = Describe("test getDeveloperInfo()", func() { - Context("with usecaching set to true", func() { - BeforeEach(func() { - config.Set(useCaching, true) - updateDeveloperInfoCache() - }) - AfterEach(func() { - config.Set(useCaching, false) - }) - Context("get developerInfo for valid tenantId and apikey", func() { - It("should return all right data", func() { - key := getKeyForDeveloperInfoCache("tenantid", "testapikey") - _, e := developerInfoCache[key] - Expect(e).To(BeFalse()) - - getDeveloperInfo("tenantid", "testapikey") - devInfo, e := developerInfoCache[key] - Expect(e).To(BeTrue()) - Expect(devInfo.ApiProduct).To(Equal("testproduct")) - Expect(devInfo.Developer).To(Equal("testdeveloper")) - }) - }) - - Context("get developerInfo for invalid tenantId and apikey", func() { - It("should return all empty", func() { - developerInfo := getDeveloperInfo("wrongid", "wrongapikey") - Expect(developerInfo.ApiProduct).To(Equal("")) - }) - }) - }) - - Context("with usecaching set to false", func() { - Context("get developerInfo for valid tenantId and apikey", func() { - It("should return all right data", func() { - developerInfo := getDeveloperInfo("tenantid", "testapikey") - Expect(developerInfo.ApiProduct).To(Equal("testproduct")) - Expect(developerInfo.Developer).To(Equal("testdeveloper")) - }) - }) - Context("get developerInfo for invalid tenantId and apikey", func() { - It("should return all right data", func() { - developerInfo := getDeveloperInfo("wrongid", "wrongapikey") - Expect(developerInfo.ApiProduct).To(Equal("")) - }) - }) - }) -}) - -var _ = Describe("test getDevInfoFromDB()", func() { - Context("get developerInfo for valid tenantId and apikey", func() { - It("should return all right data", func() { - developerInfo, err := getDevInfoFromDB("tenantid", "testapikey") - Expect(err).ToNot(HaveOccurred()) - Expect(developerInfo.ApiProduct).To(Equal("testproduct")) - Expect(developerInfo.Developer).To(Equal("testdeveloper")) - }) - }) - Context("get developerInfo for invalid tenantId and apikey", func() { - It("should return all empty data", func() { - developerInfo, err := getDevInfoFromDB("wrongid", "wrongapikey") - Expect(err).To(HaveOccurred()) - Expect(developerInfo.ApiProduct).To(Equal("")) - }) - }) - -}) - var _ = Describe("test validateTenant()", func() { Context("with usecaching set to true", func() { BeforeEach(func() { @@ -170,7 +103,7 @@ Context("valididate existing org/env", func() { It("should return true", func() { tenant := tenant{Org: "testorg", Env: "testenv"} - valid, dbError := validateTenant(&tenant) + valid, dbError := validateTenant(tenant) Expect(dbError.Reason).To(Equal("")) Expect(valid).To(BeTrue()) }) @@ -179,7 +112,7 @@ Context("get tenant for invalid scopeuuid", func() { It("should return false", func() { tenant := tenant{Org: "wrongorg", Env: "wrongenv"} - valid, dbError := validateTenant(&tenant) + valid, dbError := validateTenant(tenant) Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) Expect(valid).To(BeFalse()) }) @@ -189,7 +122,7 @@ Context("valididate existing org/env", func() { It("should return true", func() { tenant := tenant{Org: "testorg", Env: "testenv"} - valid, dbError := validateTenant(&tenant) + valid, dbError := validateTenant(tenant) Expect(dbError.Reason).To(Equal("")) Expect(valid).To(BeTrue()) }) @@ -197,7 +130,7 @@ Context("get tenant for invalid scopeuuid", func() { It("should return false", func() { tenant := tenant{Org: "wrongorg", Env: "wrongenv"} - valid, dbError := validateTenant(&tenant) + valid, dbError := validateTenant(tenant) Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) Expect(valid).To(BeFalse()) }) @@ -207,18 +140,17 @@ var _ = Describe("test validateTenantFromDB()", func() { Context("validate tenant for org/env that exists in DB", func() { - It("should not return an error and tenantId should be populated", func() { + It("should not return an error and valid should be true", func() { tenant := tenant{Org: "testorg", Env: "testenv"} - valid, dbError := validateTenantFromDB(&tenant) + valid, dbError := validateTenantFromDB(tenant) Expect(valid).To(BeTrue()) - Expect(tenant.TenantId).To(Equal("tenantid")) Expect(dbError.ErrorCode).To(Equal("")) }) }) Context("validate tenant for org/env that do not exist in DB", func() { It("should return error with unknown_scope", func() { tenant := tenant{Org: "wrongorg", Env: "wrongenv"} - valid, dbError := validateTenantFromDB(&tenant) + valid, dbError := validateTenantFromDB(tenant) Expect(valid).To(BeFalse()) Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE")) }) @@ -226,33 +158,9 @@ }) -var _ = Describe("test getValuesIgnoringNull()", func() { - Context("Null sql value", func() { - It("should return empty string", func() { - a := sql.NullString{String: "null", Valid: false} - res := getValuesIgnoringNull(a) - Expect(res).To(Equal("")) - }) - }) - Context("not null sql value", func() { - It("should return string", func() { - a := sql.NullString{String: "sql", Valid: true} - res := getValuesIgnoringNull(a) - Expect(res).To(Equal("sql")) - }) - }) -}) - -var _ = Describe("test getKeyForDeveloperInfoCache()", func() { - It("should return key using tenantid and apiKey", func() { - res := getKeyForDeveloperInfoCache("testid", "testapikey") - Expect(res).To(Equal("testid~testapikey")) - }) -}) - var _ = Describe("test getKeyForOrgEnvCache()", func() { It("should return key using org and env", func() { - res := getKeyForDeveloperInfoCache("testorg", "testenv") + res := getKeyForOrgEnvCache("testorg", "testenv") Expect(res).To(Equal("testorg~testenv")) }) })
diff --git a/init.go b/init.go index 6d92ee3..a7df88a 100644 --- a/init.go +++ b/init.go
@@ -20,7 +20,6 @@ "os" "path/filepath" "sync" - "time" ) const ( @@ -141,22 +140,6 @@ // for new messages and dump them to files initBufferingManager() - // Initialize developerInfo cache invalidation periodically - if config.GetBool(useCaching) { - updateDeveloperInfoCache() - go func() { - ticker := time.NewTicker(time.Second * - config.GetDuration(analyticsCacheRefreshInterval)) - // Ticker will keep running till go routine is running - // i.e. till application is running - defer ticker.Stop() - - for range ticker.C { - updateDeveloperInfoCache() - } - }() - } - // Create a listener for shutdown event and register callback h := func(event apid.Event) { log.Infof("Received ApidShutdown event. %v", event)
diff --git a/listener.go b/listener.go index 4dfe9fa..6d47112 100644 --- a/listener.go +++ b/listener.go
@@ -92,9 +92,8 @@ ele.Get("env", &env) if scopeuuid != "" { tenantCache[scopeuuid] = tenant{ - Org: org, - Env: env, - TenantId: tenantid} + Org: org, + Env: env} log.Debugf("Refreshed local "+ "tenantCache. Added "+ "scope: "+"%s", scopeuuid)
diff --git a/listener_test.go b/listener_test.go index 6c9e094..e49eced 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -91,7 +91,6 @@ handler.Handle(&event) tenant := tenantCache["i"] - Expect(tenant.TenantId).To(Equal("s")) Expect(tenant.Org).To(Equal("o")) Expect(tenant.Env).To(Equal("e")) @@ -141,7 +140,6 @@ handler.Handle(&insert) tenant := tenantCache["i2"] - Expect(tenant.TenantId).To(Equal("s2")) Expect(tenant.Org).To(Equal("o2")) Expect(tenant.Env).To(Equal("e2"))