Changed caching logic for snapshot event
diff --git a/common_helper.go b/common_helper.go index f1d34b6..05805bf 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -16,7 +16,6 @@ import ( "database/sql" - "github.com/apigee-labs/transicator/common" "sync" ) @@ -36,59 +35,60 @@ // Load data scope information into an in-memory cache so that // for each record a DB lookup is not required -func createTenantCache(snapshot *common.Snapshot) { +func createTenantCache() { // Lock before writing to the map as it has multiple readers tenantCachelock.Lock() defer tenantCachelock.Unlock() tenantCache = make(map[string]tenant) - for _, table := range snapshot.Tables { - switch table.Name { - case "edgex.data_scope": - for _, row := range table.Rows { - var org, env, id string + var org, env, id string - row.Get("id", &id) - row.Get("org", &org) - row.Get("env", &env) - if id != "" { - tenantCache[id] = tenant{Org: org, - Env: env} - } - } + db := getDB() + rows, error := db.Query("SELECT env, org, id FROM edgex_data_scope") + + if error != nil { + log.Warnf("Could not get datascope from DB due to : %s", error.Error()) + } else { + defer rows.Close() + // Lock before writing to the map as it has multiple readers + for rows.Next() { + rows.Scan(&env, &org, &id) + tenantCache[id] = tenant{Org: org, Env: env} } } + log.Debugf("Count of data scopes in the cache: %d", len(tenantCache)) } // Load data scope information into an in-memory cache so that // for each record a DB lookup is not required -func createOrgEnvCache(snapshot *common.Snapshot) { +func createOrgEnvCache() { // Lock before writing to the map as it has multiple readers orgEnvCacheLock.Lock() defer orgEnvCacheLock.Unlock() orgEnvCache = make(map[string]bool) - for _, table := range snapshot.Tables { - switch table.Name { - case "edgex.data_scope": - for _, row := range table.Rows { - var org, env string + var org, env string + db := getDB() - row.Get("org", &org) - row.Get("env", &env) - orgEnv := getKeyForOrgEnvCache(org, env) - if orgEnv != "" { - orgEnvCache[orgEnv] = true - } - } + rows, error := db.Query("SELECT env, org FROM edgex_data_scope") + + if error != nil { + log.Warnf("Could not get datascope from DB due to : %s", error.Error()) + } else { + defer rows.Close() + // Lock before writing to the map as it has multiple readers + for rows.Next() { + rows.Scan(&env, &org) + orgEnv := getKeyForOrgEnvCache(org, env) + orgEnvCache[orgEnv] = true } } log.Debugf("Count of org~env in the cache: %d", len(orgEnvCache)) } // Returns Tenant Info given a scope uuid from the cache or by querying -// the DB directly based on useCachig config +// the DB directly based on useCaching config func getTenantForScope(scopeuuid string) (tenant, dbError) { if config.GetBool(useCaching) { // acquire a read lock as this cache has 1 writer as well
diff --git a/common_helper_test.go b/common_helper_test.go index 15df836..c4e8e90 100644 --- a/common_helper_test.go +++ b/common_helper_test.go
@@ -17,19 +17,28 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - - "github.com/apigee-labs/transicator/common" ) +var _ = Describe("test createTenantCache()", func() { + It("It should create a cache from DB", func() { + createTenantCache() + Expect(len(tenantCache)).To(Equal(1)) + }) +}) + +var _ = Describe("test createOrgEnvCache()", func() { + It("It should create a cache from DB", func() { + createOrgEnvCache() + Expect(len(orgEnvCache)).To(Equal(1)) + }) +}) + var _ = Describe("test getTenantForScope()", func() { Context("with usecaching set to true", func() { BeforeEach(func() { config.Set(useCaching, true) - snapshot := getDatascopeSnapshot() - createTenantCache(&snapshot) - createOrgEnvCache(&snapshot) + createTenantCache() Expect(len(tenantCache)).To(Equal(1)) - Expect(len(orgEnvCache)).To(Equal(1)) }) AfterEach(func() { config.Set(useCaching, false) @@ -41,7 +50,6 @@ Expect(tenant.Org).To(Equal("testorg")) }) }) - Context("get tenant for invalid scopeuuid", func() { It("should return empty tenant and a db error", func() { tenant, dbError := getTenantForScope("wrongid") @@ -59,7 +67,6 @@ Expect(tenant.Org).To(Equal("testorg")) }) }) - Context("get tenant for invalid scopeuuid", func() { It("should return empty tenant and a db error", func() { tenant, dbError := getTenantForScope("wrongid") @@ -90,11 +97,11 @@ }) var _ = Describe("test validateTenant()", func() { + Context("with usecaching set to true", func() { BeforeEach(func() { config.Set(useCaching, true) - snapshot := getDatascopeSnapshot() - createOrgEnvCache(&snapshot) + createOrgEnvCache() Expect(len(orgEnvCache)).To(Equal(1)) }) AfterEach(func() { @@ -164,23 +171,3 @@ Expect(res).To(Equal("testorg~testenv")) }) }) - -func getDatascopeSnapshot() common.Snapshot { - event := common.Snapshot{ - SnapshotInfo: "test_snapshot_valid", - Tables: []common.Table{ - { - Name: LISTENER_TABLE_DATA_SCOPE, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "testid"}, - "scope": &common.ColumnVal{Value: "tenantid"}, - "org": &common.ColumnVal{Value: "testorg"}, - "env": &common.ColumnVal{Value: "testenv"}, - }, - }, - }, - }, - } - return event -}
diff --git a/init.go b/init.go index a7df88a..91351ea 100644 --- a/init.go +++ b/init.go
@@ -53,10 +53,6 @@ // cache to avoid DB calls for each analytics message useCaching = "apidanalytics_use_caching" useCachingDefault = false - - // Interval in seconds when the developer cache should be refreshed - analyticsCacheRefreshInterval = "apidanalytics_cache_refresh_interval" - analyticsCacheRefreshIntervaleDefault = 1800 ) // keep track of the services that this plugin will use @@ -180,9 +176,6 @@ // set default config for useCaching config.SetDefault(useCaching, useCachingDefault) - // set default config for cache refresh interval - config.SetDefault(analyticsCacheRefreshInterval, analyticsCacheRefreshIntervaleDefault) - // set default config for upload interval config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
diff --git a/listener.go b/listener.go index 6d47112..9143e5b 100644 --- a/listener.go +++ b/listener.go
@@ -51,10 +51,10 @@ setDB(db) if config.GetBool(useCaching) { - createTenantCache(snapshot) + createTenantCache() log.Debug("Created a local cache" + " for datasope information") - createOrgEnvCache(snapshot) + createOrgEnvCache() log.Debug("Created a local cache for org~env Information") } else { log.Info("Will not be caching any developer or tenant info " +
diff --git a/listener_test.go b/listener_test.go index e49eced..fc73c2f 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -15,107 +15,61 @@ package apidAnalytics import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" ) +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + const ( LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope" ) var _ = Describe("ApigeeSync event", func() { - var db apid.DB handler := handler{} - BeforeEach(func() { - db = getDB() - - config.Set(useCaching, true) - - snapshot := getDatascopeSnapshot() - createTenantCache(&snapshot) - createOrgEnvCache(&snapshot) - Expect(len(tenantCache)).To(Equal(1)) - Expect(len(orgEnvCache)).To(Equal(1)) - }) - - AfterEach(func() { - config.Set(useCaching, false) - setDB(db) - }) - Context("ApigeeSync snapshot event", func() { + var db apid.DB + var snapshot common.Snapshot + + BeforeEach(func() { + db = getDB() + snapshot = common.Snapshot{SnapshotInfo: "test_snapshot"} + }) + + AfterEach(func() { + setDB(db) + }) + It("should set DB to appropriate version", func() { - config.Set(useCaching, false) + handler.Handle(&snapshot) - event := common.Snapshot{ - SnapshotInfo: "test_snapshot", - Tables: []common.Table{}, - } - - handler.Handle(&event) - - expectedDB, err := data.DBVersion(event.SnapshotInfo) + expectedDB, err := data.DBVersion(snapshot.SnapshotInfo) Expect(err).NotTo(HaveOccurred()) Expect(getDB() == expectedDB).Should(BeTrue()) }) - - It("should process a valid Snapshot", func() { - event := common.Snapshot{ - SnapshotInfo: "test_snapshot_valid", - Tables: []common.Table{ - { - Name: LISTENER_TABLE_DATA_SCOPE, - Rows: []common.Row{ - { - "id": &common.ColumnVal{Value: "i"}, - "_change_selector": &common.ColumnVal{Value: "c"}, - "apid_cluster_id": &common.ColumnVal{Value: "a"}, - "scope": &common.ColumnVal{Value: "s"}, - "org": &common.ColumnVal{Value: "o"}, - "env": &common.ColumnVal{Value: "e"}, - "created": &common.ColumnVal{Value: "c"}, - "created_by": &common.ColumnVal{Value: "c"}, - "updated": &common.ColumnVal{Value: "u"}, - "updated_by": &common.ColumnVal{Value: "u"}, - }, - }, - }, - }, - } - - handler.Handle(&event) - tenant := tenantCache["i"] - Expect(tenant.Org).To(Equal("o")) - Expect(tenant.Env).To(Equal("e")) - - orgEnv := getKeyForOrgEnvCache("o", "e") - Expect(orgEnvCache[orgEnv]).To(BeTrue()) - }) }) Context("Process changeList", func() { Context(LISTENER_TABLE_DATA_SCOPE, func() { - It("insert/delete event should add/remove to/from cache if usecaching is true", func() { - txn, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - txn.Exec("INSERT INTO edgex_data_scope (id, _change_selector, apid_cluster_id, scope, org, env) "+ - "VALUES"+ - "($1,$2,$3,$4,$5,$6)", - "i2", - "c2", - "a2", - "s2", - "o2", - "e2", - ) - txn.Commit() + BeforeEach(func() { + config.Set(useCaching, true) + createTenantCache() + Expect(len(tenantCache)).To(Equal(1)) + createOrgEnvCache() + Expect(len(orgEnvCache)).To(Equal(1)) + }) + AfterEach(func() { + config.Set(useCaching, false) + }) + + It("insert/delete event should add/remove to/from cache if usecaching is true", func() { insert := common.ChangeList{ LastSequence: "test", Changes: []common.Change{ @@ -146,11 +100,6 @@ orgEnv := getKeyForOrgEnvCache("o2", "e2") Expect(orgEnvCache[orgEnv]).To(BeTrue()) - txn, err = getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - txn.Exec("DELETE FROM edgex_data_scope where id = 'i2'") - txn.Commit() - delete := common.ChangeList{ LastSequence: "test", Changes: []common.Change{