Changed caching logic for snapshot event
diff --git a/common_helper.go b/common_helper.go
index f1d34b6..05805bf 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -16,7 +16,6 @@
import (
"database/sql"
- "github.com/apigee-labs/transicator/common"
"sync"
)
@@ -36,59 +35,60 @@
// Load data scope information into an in-memory cache so that
// for each record a DB lookup is not required
-func createTenantCache(snapshot *common.Snapshot) {
+func createTenantCache() {
// Lock before writing to the map as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
tenantCache = make(map[string]tenant)
- for _, table := range snapshot.Tables {
- switch table.Name {
- case "edgex.data_scope":
- for _, row := range table.Rows {
- var org, env, id string
+ var org, env, id string
- row.Get("id", &id)
- row.Get("org", &org)
- row.Get("env", &env)
- if id != "" {
- tenantCache[id] = tenant{Org: org,
- Env: env}
- }
- }
+ db := getDB()
+ rows, error := db.Query("SELECT env, org, id FROM edgex_data_scope")
+
+ if error != nil {
+ log.Warnf("Could not get datascope from DB due to : %s", error.Error())
+ } else {
+ defer rows.Close()
+ // Lock before writing to the map as it has multiple readers
+ for rows.Next() {
+ rows.Scan(&env, &org, &id)
+ tenantCache[id] = tenant{Org: org, Env: env}
}
}
+
log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
}
// Load data scope information into an in-memory cache so that
// for each record a DB lookup is not required
-func createOrgEnvCache(snapshot *common.Snapshot) {
+func createOrgEnvCache() {
// Lock before writing to the map as it has multiple readers
orgEnvCacheLock.Lock()
defer orgEnvCacheLock.Unlock()
orgEnvCache = make(map[string]bool)
- for _, table := range snapshot.Tables {
- switch table.Name {
- case "edgex.data_scope":
- for _, row := range table.Rows {
- var org, env string
+ var org, env string
+ db := getDB()
- row.Get("org", &org)
- row.Get("env", &env)
- orgEnv := getKeyForOrgEnvCache(org, env)
- if orgEnv != "" {
- orgEnvCache[orgEnv] = true
- }
- }
+ rows, error := db.Query("SELECT env, org FROM edgex_data_scope")
+
+ if error != nil {
+ log.Warnf("Could not get datascope from DB due to : %s", error.Error())
+ } else {
+ defer rows.Close()
+ // Lock before writing to the map as it has multiple readers
+ for rows.Next() {
+ rows.Scan(&env, &org)
+ orgEnv := getKeyForOrgEnvCache(org, env)
+ orgEnvCache[orgEnv] = true
}
}
log.Debugf("Count of org~env in the cache: %d", len(orgEnvCache))
}
// Returns Tenant Info given a scope uuid from the cache or by querying
-// the DB directly based on useCachig config
+// the DB directly based on useCaching config
func getTenantForScope(scopeuuid string) (tenant, dbError) {
if config.GetBool(useCaching) {
// acquire a read lock as this cache has 1 writer as well
diff --git a/common_helper_test.go b/common_helper_test.go
index 15df836..c4e8e90 100644
--- a/common_helper_test.go
+++ b/common_helper_test.go
@@ -17,19 +17,28 @@
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
-
- "github.com/apigee-labs/transicator/common"
)
+var _ = Describe("test createTenantCache()", func() {
+ It("It should create a cache from DB", func() {
+ createTenantCache()
+ Expect(len(tenantCache)).To(Equal(1))
+ })
+})
+
+var _ = Describe("test createOrgEnvCache()", func() {
+ It("It should create a cache from DB", func() {
+ createOrgEnvCache()
+ Expect(len(orgEnvCache)).To(Equal(1))
+ })
+})
+
var _ = Describe("test getTenantForScope()", func() {
Context("with usecaching set to true", func() {
BeforeEach(func() {
config.Set(useCaching, true)
- snapshot := getDatascopeSnapshot()
- createTenantCache(&snapshot)
- createOrgEnvCache(&snapshot)
+ createTenantCache()
Expect(len(tenantCache)).To(Equal(1))
- Expect(len(orgEnvCache)).To(Equal(1))
})
AfterEach(func() {
config.Set(useCaching, false)
@@ -41,7 +50,6 @@
Expect(tenant.Org).To(Equal("testorg"))
})
})
-
Context("get tenant for invalid scopeuuid", func() {
It("should return empty tenant and a db error", func() {
tenant, dbError := getTenantForScope("wrongid")
@@ -59,7 +67,6 @@
Expect(tenant.Org).To(Equal("testorg"))
})
})
-
Context("get tenant for invalid scopeuuid", func() {
It("should return empty tenant and a db error", func() {
tenant, dbError := getTenantForScope("wrongid")
@@ -90,11 +97,11 @@
})
var _ = Describe("test validateTenant()", func() {
+
Context("with usecaching set to true", func() {
BeforeEach(func() {
config.Set(useCaching, true)
- snapshot := getDatascopeSnapshot()
- createOrgEnvCache(&snapshot)
+ createOrgEnvCache()
Expect(len(orgEnvCache)).To(Equal(1))
})
AfterEach(func() {
@@ -164,23 +171,3 @@
Expect(res).To(Equal("testorg~testenv"))
})
})
-
-func getDatascopeSnapshot() common.Snapshot {
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot_valid",
- Tables: []common.Table{
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "testid"},
- "scope": &common.ColumnVal{Value: "tenantid"},
- "org": &common.ColumnVal{Value: "testorg"},
- "env": &common.ColumnVal{Value: "testenv"},
- },
- },
- },
- },
- }
- return event
-}
diff --git a/init.go b/init.go
index a7df88a..91351ea 100644
--- a/init.go
+++ b/init.go
@@ -53,10 +53,6 @@
// cache to avoid DB calls for each analytics message
useCaching = "apidanalytics_use_caching"
useCachingDefault = false
-
- // Interval in seconds when the developer cache should be refreshed
- analyticsCacheRefreshInterval = "apidanalytics_cache_refresh_interval"
- analyticsCacheRefreshIntervaleDefault = 1800
)
// keep track of the services that this plugin will use
@@ -180,9 +176,6 @@
// set default config for useCaching
config.SetDefault(useCaching, useCachingDefault)
- // set default config for cache refresh interval
- config.SetDefault(analyticsCacheRefreshInterval, analyticsCacheRefreshIntervaleDefault)
-
// set default config for upload interval
config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
diff --git a/listener.go b/listener.go
index 6d47112..9143e5b 100644
--- a/listener.go
+++ b/listener.go
@@ -51,10 +51,10 @@
setDB(db)
if config.GetBool(useCaching) {
- createTenantCache(snapshot)
+ createTenantCache()
log.Debug("Created a local cache" +
" for datasope information")
- createOrgEnvCache(snapshot)
+ createOrgEnvCache()
log.Debug("Created a local cache for org~env Information")
} else {
log.Info("Will not be caching any developer or tenant info " +
diff --git a/listener_test.go b/listener_test.go
index e49eced..fc73c2f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -15,107 +15,61 @@
package apidAnalytics
import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
const (
LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
var _ = Describe("ApigeeSync event", func() {
- var db apid.DB
handler := handler{}
- BeforeEach(func() {
- db = getDB()
-
- config.Set(useCaching, true)
-
- snapshot := getDatascopeSnapshot()
- createTenantCache(&snapshot)
- createOrgEnvCache(&snapshot)
- Expect(len(tenantCache)).To(Equal(1))
- Expect(len(orgEnvCache)).To(Equal(1))
- })
-
- AfterEach(func() {
- config.Set(useCaching, false)
- setDB(db)
- })
-
Context("ApigeeSync snapshot event", func() {
+ var db apid.DB
+ var snapshot common.Snapshot
+
+ BeforeEach(func() {
+ db = getDB()
+ snapshot = common.Snapshot{SnapshotInfo: "test_snapshot"}
+ })
+
+ AfterEach(func() {
+ setDB(db)
+ })
+
It("should set DB to appropriate version", func() {
- config.Set(useCaching, false)
+ handler.Handle(&snapshot)
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot",
- Tables: []common.Table{},
- }
-
- handler.Handle(&event)
-
- expectedDB, err := data.DBVersion(event.SnapshotInfo)
+ expectedDB, err := data.DBVersion(snapshot.SnapshotInfo)
Expect(err).NotTo(HaveOccurred())
Expect(getDB() == expectedDB).Should(BeTrue())
})
-
- It("should process a valid Snapshot", func() {
- event := common.Snapshot{
- SnapshotInfo: "test_snapshot_valid",
- Tables: []common.Table{
- {
- Name: LISTENER_TABLE_DATA_SCOPE,
- Rows: []common.Row{
- {
- "id": &common.ColumnVal{Value: "i"},
- "_change_selector": &common.ColumnVal{Value: "c"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- },
- },
- },
- },
- }
-
- handler.Handle(&event)
- tenant := tenantCache["i"]
- Expect(tenant.Org).To(Equal("o"))
- Expect(tenant.Env).To(Equal("e"))
-
- orgEnv := getKeyForOrgEnvCache("o", "e")
- Expect(orgEnvCache[orgEnv]).To(BeTrue())
- })
})
Context("Process changeList", func() {
Context(LISTENER_TABLE_DATA_SCOPE, func() {
- It("insert/delete event should add/remove to/from cache if usecaching is true", func() {
- txn, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- txn.Exec("INSERT INTO edgex_data_scope (id, _change_selector, apid_cluster_id, scope, org, env) "+
- "VALUES"+
- "($1,$2,$3,$4,$5,$6)",
- "i2",
- "c2",
- "a2",
- "s2",
- "o2",
- "e2",
- )
- txn.Commit()
+ BeforeEach(func() {
+ config.Set(useCaching, true)
+ createTenantCache()
+ Expect(len(tenantCache)).To(Equal(1))
+ createOrgEnvCache()
+ Expect(len(orgEnvCache)).To(Equal(1))
+ })
+ AfterEach(func() {
+ config.Set(useCaching, false)
+ })
+
+ It("insert/delete event should add/remove to/from cache if usecaching is true", func() {
insert := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -146,11 +100,6 @@
orgEnv := getKeyForOrgEnvCache("o2", "e2")
Expect(orgEnvCache[orgEnv]).To(BeTrue())
- txn, err = getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- txn.Exec("DELETE FROM edgex_data_scope where id = 'i2'")
- txn.Commit()
-
delete := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{