Merge pull request #9 from 30x/travis-ci
Add coverage script and travis CI config
diff --git a/README.md b/README.md
index 0b41867..699a2f6 100644
--- a/README.md
+++ b/README.md
@@ -14,20 +14,21 @@
| apidanalytics_uap_server_base | string. url. required. |
| apidanalytics_use_caching | boolean. default: true |
| apidanalytics_buffer_channel_size | int. number of slots. default: 100|
+| apidanalytics_cache_refresh_interval | int. seconds. default: 1800 |
### Startup Procedure
1. Initialize crash recovery, upload and buffering manager to handle buffering analytics messages to files
locally and then periodically upload these files to S3/GCS based on signedURL received from
uapCollectionEndpoint exposed via edgex proxy
2. Create a listener for Apigee-Sync event
- 1. Each time a Snapshot is received, create an in-memory cache for data scope and developer information
- 2. Each time a changeList is received
- 1. if data_scope info changed, then insert/delete info for changed scope from tenantCache
- 2. if any other kms table is changed, then refresh entire developerInfo cache as only specific
- fields are saved in the cache
+ 1. Each time a Snapshot is received, create an in-memory cache for data scope
+ 2. Each time a changeList is received, if data_scope info changed, then insert/delete info for changed scope from tenantCache
+ 3. Developer info cache will be invalidated periodically and populated when 1st request for that apiKey comes in
3. Initialize POST /analytics/{scope_uuid} API
4. Upon receiving POST requests
1. Validate and enrich each batch of analytics records
+ 1. If developerCache does not have info for apiKey then get from DB and insert into cache.
+ This way the cache will only have info for developers/app with active traffic
2. If valid, then publish records to an internal buffer channel
5. Buffering Logic
1. Buffering manager creates listener on the internal buffer channel and thus consumes messages
diff --git a/api_helper.go b/api_helper.go
index 24dba22..45e99f0 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -146,22 +146,24 @@
// apiKey doesnt exist then ignore adding developer fields
if exists {
apiKey := apiKey.(string)
- devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
- _, exists := recordMap["api_product"]
- if !exists {
- recordMap["api_product"] = devInfo.ApiProduct
- }
- _, exists = recordMap["developer_app"]
- if !exists {
- recordMap["developer_app"] = devInfo.DeveloperApp
- }
- _, exists = recordMap["developer_email"]
- if !exists {
- recordMap["developer_email"] = devInfo.DeveloperEmail
- }
- _, exists = recordMap["developer"]
- if !exists {
- recordMap["developer"] = devInfo.Developer
+ if apiKey != "" {
+ devInfo := getDeveloperInfo(tenant.TenantId, apiKey)
+ _, exists := recordMap["api_product"]
+ if !exists {
+ recordMap["api_product"] = devInfo.ApiProduct
+ }
+ _, exists = recordMap["developer_app"]
+ if !exists {
+ recordMap["developer_app"] = devInfo.DeveloperApp
+ }
+ _, exists = recordMap["developer_email"]
+ if !exists {
+ recordMap["developer_email"] = devInfo.DeveloperEmail
+ }
+ _, exists = recordMap["developer"]
+ if !exists {
+ recordMap["developer"] = devInfo.Developer
+ }
}
}
}
diff --git a/api_helper_test.go b/api_helper_test.go
index 2ccc9f5..7b83679 100644
--- a/api_helper_test.go
+++ b/api_helper_test.go
@@ -21,7 +21,7 @@
raw := getRaw(record)
valid, e := validate(raw)
- Expect(valid).To(Equal(false))
+ Expect(valid).To(BeFalse())
Expect(e.ErrorCode).To(Equal("MISSING_FIELD"))
By("payload with clst > clet")
@@ -34,7 +34,7 @@
raw = getRaw(record)
valid, e = validate(raw)
- Expect(valid).To(Equal(false))
+ Expect(valid).To(BeFalse())
Expect(e.ErrorCode).To(Equal("BAD_DATA"))
Expect(e.Reason).To(Equal("client_received_start_timestamp > client_received_end_timestamp"))
@@ -50,7 +50,7 @@
}`)
raw := getRaw(record)
valid, _ := validate(raw)
- Expect(valid).To(Equal(true))
+ Expect(valid).To(BeTrue())
})
})
})
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index 3a7e0c4..46e2421 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -47,15 +47,6 @@
config.Set(uapServerBase, "http://localhost:9000") // dummy value
Expect(apid.InitializePlugins).ToNot(Panic())
- // create initial cache for tenant and developer info
- config.Set(useCaching, true)
-
- createTenantCache()
- Expect(len(tenantCache)).To(Equal(1))
-
- createDeveloperInfoCache()
- Expect(len(developerInfoCache)).To(Equal(1))
-
// Analytics POST API
router := apid.API().Router()
router.HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", func(w http.ResponseWriter, req *http.Request) {
diff --git a/buffering_manager_test.go b/buffering_manager_test.go
index 1789317..fa4ad85 100644
--- a/buffering_manager_test.go
+++ b/buffering_manager_test.go
@@ -96,6 +96,9 @@
Expect(record["response_status_code"]).To(Equal(json.Number("200")))
Expect(record["client_received_start_timestamp"]).To(Equal(json.Number("1486406248277")))
Expect(record["client_received_end_timestamp"]).To(Equal(json.Number("1486406248290")))
+
+ err = os.Remove(completeFilePath)
+ Expect(err).ToNot((HaveOccurred()))
})
})
})
@@ -122,23 +125,10 @@
time.Sleep(time.Second * 2)
expectedDirPath := filepath.Join(localAnalyticsStagingDir, dirName)
- status, _ := exists(expectedDirPath)
- Expect(status).To(BeTrue())
+ Expect(expectedDirPath).To(BeADirectory())
expectedfilePath := filepath.Join(localAnalyticsStagingDir, dirName, fileName)
- status, _ = exists(expectedfilePath)
- Expect(status).To(BeTrue())
+ Expect(expectedfilePath).To(BeAnExistingFile())
})
})
})
-
-func exists(path string) (bool, error) {
- _, err := os.Stat(path)
- if err == nil {
- return true, nil
- }
- if os.IsNotExist(err) {
- return false, nil
- }
- return true, err
-}
diff --git a/common_helper.go b/common_helper.go
index 4b07b66..b3a9a5f 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -2,7 +2,7 @@
import (
"database/sql"
- "fmt"
+ "github.com/apigee-labs/transicator/common"
"sync"
)
@@ -22,78 +22,41 @@
// Load data scope information into an in-memory cache so that
// for each record a DB lookup is not required
-func createTenantCache() error {
+func createTenantCache(snapshot *common.Snapshot) {
// Lock before writing to the map as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
tenantCache = make(map[string]tenant)
- var org, env, tenantId, id string
- db := getDB()
- rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE")
+ for _, table := range snapshot.Tables {
+ switch table.Name {
+ case "edgex.data_scope":
+ for _, row := range table.Rows {
+ var org, env, tenantId, id string
- if error != nil {
- return fmt.Errorf("Count not get datascope from "+
- "DB due to: %v", error)
- } else {
- defer rows.Close()
- for rows.Next() {
- rows.Scan(&env, &org, &tenantId, &id)
- tenantCache[id] = tenant{Org: org,
- Env: env,
- TenantId: tenantId}
+ row.Get("id", &id)
+ row.Get("scope", &tenantId)
+ row.Get("org", &org)
+ row.Get("env", &env)
+ if id != "" {
+ tenantCache[id] = tenant{Org: org,
+ Env: env,
+ TenantId: tenantId}
+ }
+ }
}
}
-
log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
- return nil
}
// Load data scope information into an in-memory cache so that
// for each record a DB lookup is not required
-func createDeveloperInfoCache() error {
+func updateDeveloperInfoCache() {
// Lock before writing to the map as it has multiple readers
developerInfoCacheLock.Lock()
defer developerInfoCacheLock.Unlock()
developerInfoCache = make(map[string]developerInfo)
- var apiProduct, developerApp, developerEmail, developer sql.NullString
- var tenantId, apiKey string
-
- db := getDB()
- sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name," +
- " a.name, d.username, d.email " +
- "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
- "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
- "INNER JOIN APP AS a ON a.id = mp.app_id " +
- "INNER JOIN DEVELOPER as d ON d.id = a.developer_id;"
- rows, error := db.Query(sSql)
-
- if error != nil {
- return fmt.Errorf("Count not get developerInfo "+
- "from DB due to: %v", error)
- } else {
- defer rows.Close()
- for rows.Next() {
- rows.Scan(&tenantId, &apiKey, &apiProduct,
- &developerApp, &developer, &developerEmail)
-
- keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
- apiPrd := getValuesIgnoringNull(apiProduct)
- devApp := getValuesIgnoringNull(developerApp)
- dev := getValuesIgnoringNull(developer)
- devEmail := getValuesIgnoringNull(developerEmail)
-
- developerInfoCache[keyForMap] = developerInfo{
- ApiProduct: apiPrd,
- DeveloperApp: devApp,
- DeveloperEmail: devEmail,
- Developer: dev}
- }
- }
-
- log.Debugf("Count of apiKey~tenantId combinations "+
- "in the cache: %d", len(developerInfoCache))
- return nil
+ log.Debug("Invalidated developerInfo cache")
}
// Returns Tenant Info given a scope uuid from the cache or by querying
@@ -102,104 +65,137 @@
if config.GetBool(useCaching) {
// acquire a read lock as this cache has 1 writer as well
tenantCachelock.RLock()
- defer tenantCachelock.RUnlock()
ten, exists := tenantCache[scopeuuid]
+ tenantCachelock.RUnlock()
+ dbErr := dbError{}
+
if !exists {
- reason := "No tenant found for this scopeuuid: " + scopeuuid
- errorCode := "UNKNOWN_SCOPE"
- // Incase of unknown scope, try to refresh the
- // cache ansynchronously incase an update was missed or delayed
- go createTenantCache()
- return tenant{}, dbError{
- ErrorCode: errorCode,
- Reason: reason}
- } else {
- return ten, dbError{}
+ log.Debugf("No tenant found for scopeuuid = %s "+
+ "in cache", scopeuuid)
+ log.Debug("loading info from DB")
+
+ // Update cache
+ t, err := getTenantFromDB(scopeuuid)
+
+ if err.ErrorCode != "" {
+ dbErr = err
+ ten = t
+ } else {
+ // update cache
+ tenantCachelock.Lock()
+ defer tenantCachelock.Unlock()
+ tenantCache[scopeuuid] = t
+ ten = t
+ }
}
+ return ten, dbErr
} else {
- var org, env, tenantId string
-
- db := getDB()
- error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+
- " where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
-
- switch {
- case error == sql.ErrNoRows:
- reason := "No tenant found for this scopeuuid: " + scopeuuid
- errorCode := "UNKNOWN_SCOPE"
- return tenant{}, dbError{
- ErrorCode: errorCode,
- Reason: reason}
- case error != nil:
- reason := error.Error()
- errorCode := "INTERNAL_SEARCH_ERROR"
- return tenant{}, dbError{
- ErrorCode: errorCode,
- Reason: reason}
- }
- return tenant{
- Org: org,
- Env: env,
- TenantId: tenantId}, dbError{}
+ return getTenantFromDB(scopeuuid)
}
}
-// Returns Dveloper related info given an apiKey and tenantId
+// Returns Developer related info given an apiKey and tenantId
// from the cache or by querying the DB directly based on useCachig config
func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
if config.GetBool(useCaching) {
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
// acquire a read lock as this cache has 1 writer as well
- tenantCachelock.RLock()
- defer tenantCachelock.RUnlock()
+ developerInfoCacheLock.RLock()
devInfo, exists := developerInfoCache[keyForMap]
+ developerInfoCacheLock.RUnlock()
+
if !exists {
- log.Warnf("No data found for for tenantId = %s"+
- " and apiKey = %s", tenantId, apiKey)
- // Incase of unknown apiKey~tenantId,
- // try to refresh the cache ansynchronously incase an update was missed or delayed
- go createDeveloperInfoCache()
- return developerInfo{}
- } else {
- return devInfo
+ log.Debugf("No data found for for tenantId = %s"+
+ " and apiKey = %s in cache", tenantId, apiKey)
+ log.Debug("loading info from DB")
+
+ // Update cache
+ dev, err := getDevInfoFromDB(tenantId, apiKey)
+
+ if err == nil {
+ // update cache
+ developerInfoCacheLock.Lock()
+ defer developerInfoCacheLock.Unlock()
+ key := getKeyForDeveloperInfoCache(tenantId, apiKey)
+ developerInfoCache[key] = dev
+ }
+
+ devInfo = dev
+
}
+ return devInfo
} else {
- var apiProduct, developerApp, developerEmail sql.NullString
- var developer sql.NullString
-
- db := getDB()
- sSql := "SELECT ap.name, a.name, d.username, d.email " +
- "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
- "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
- "INNER JOIN APP AS a ON a.id = mp.app_id " +
- "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
- "where mp.tenant_id = ? and mp.appcred_id = ?;"
- error := db.QueryRow(sSql, tenantId, apiKey).
- Scan(&apiProduct, &developerApp,
- &developer, &developerEmail)
-
- switch {
- case error == sql.ErrNoRows:
- log.Debugf("No data found for for tenantId = %s "+
- "and apiKey = %s", tenantId, apiKey)
- return developerInfo{}
- case error != nil:
- log.Debugf("No data found for for tenantId = %s and "+
- "apiKey = %s due to: %v", tenantId, apiKey, error)
- return developerInfo{}
- }
-
- apiPrd := getValuesIgnoringNull(apiProduct)
- devApp := getValuesIgnoringNull(developerApp)
- dev := getValuesIgnoringNull(developer)
- devEmail := getValuesIgnoringNull(developerEmail)
- return developerInfo{ApiProduct: apiPrd,
- DeveloperApp: devApp,
- DeveloperEmail: devEmail,
- Developer: dev}
+ devInfo, _ := getDevInfoFromDB(tenantId, apiKey)
+ return devInfo
}
}
+// Returns tenant info by querying DB directly
+func getTenantFromDB(scopeuuid string) (tenant, dbError) {
+ var org, env, tenantId string
+
+ db := getDB()
+ error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+
+ " where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
+
+ switch {
+ case error == sql.ErrNoRows:
+ reason := "No tenant found for this scopeuuid: " + scopeuuid
+ errorCode := "UNKNOWN_SCOPE"
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
+ case error != nil:
+ reason := error.Error()
+ errorCode := "INTERNAL_SEARCH_ERROR"
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
+ }
+ return tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantId}, dbError{}
+}
+
+// Returns developer info by querying DB directly
+func getDevInfoFromDB(tenantId string, apiKey string) (developerInfo, error) {
+ var apiProduct, developerApp, developerEmail sql.NullString
+ var developer sql.NullString
+
+ db := getDB()
+ sSql := "SELECT ap.name, a.name, d.username, d.email " +
+ "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
+ "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
+ "INNER JOIN APP AS a ON a.id = mp.app_id " +
+ "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
+ "where mp.tenant_id = ? and mp.appcred_id = ?;"
+ error := db.QueryRow(sSql, tenantId, apiKey).
+ Scan(&apiProduct, &developerApp,
+ &developer, &developerEmail)
+
+ switch {
+ case error == sql.ErrNoRows:
+ log.Debugf("No data found for for tenantId = %s "+
+ "and apiKey = %s in DB", tenantId, apiKey)
+ return developerInfo{}, error
+ case error != nil:
+ log.Debugf("No data found for for tenantId = %s and "+
+ "apiKey = %s due to: %v", tenantId, apiKey, error)
+ return developerInfo{}, error
+ }
+
+ apiPrd := getValuesIgnoringNull(apiProduct)
+ devApp := getValuesIgnoringNull(developerApp)
+ dev := getValuesIgnoringNull(developer)
+ devEmail := getValuesIgnoringNull(developerEmail)
+
+ return developerInfo{ApiProduct: apiPrd,
+ DeveloperApp: devApp,
+ DeveloperEmail: devEmail,
+ Developer: dev}, nil
+}
+
// Helper method to handle scanning null values in DB to empty string
func getValuesIgnoringNull(sqlValue sql.NullString) string {
if sqlValue.Valid {
diff --git a/common_helper_test.go b/common_helper_test.go
index 31bdb31..5e23618 100644
--- a/common_helper_test.go
+++ b/common_helper_test.go
@@ -4,10 +4,21 @@
"database/sql"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+
+ "github.com/apigee-labs/transicator/common"
)
var _ = Describe("test getTenantForScope()", func() {
Context("with usecaching set to true", func() {
+ BeforeEach(func() {
+ config.Set(useCaching, true)
+ snapshot := getDatascopeSnapshot()
+ createTenantCache(&snapshot)
+ Expect(len(tenantCache)).To(Equal(1))
+ })
+ AfterEach(func() {
+ config.Set(useCaching, false)
+ })
Context("get tenant for valid scopeuuid", func() {
It("should return testorg and testenv", func() {
tenant, dbError := getTenantForScope("testid")
@@ -26,12 +37,6 @@
})
Context("with usecaching set to false", func() {
- BeforeEach(func() {
- config.Set(useCaching, false)
- })
- AfterEach(func() {
- config.Set(useCaching, true)
- })
Context("get tenant for valid scopeuuid", func() {
It("should return testorg and testenv", func() {
tenant, dbError := getTenantForScope("testid")
@@ -50,18 +55,49 @@
})
})
+var _ = Describe("test getTenantFromDB()", func() {
+ Context("get developerInfo for valid scopeuuid", func() {
+ It("should return all right data", func() {
+ tenant, dbError := getTenantFromDB("testid")
+ Expect(dbError.Reason).To(Equal(""))
+ Expect(tenant.TenantId).To(Equal("tenantid"))
+ })
+ })
+ Context("get developerInfo for invalid scopeuuid", func() {
+ It("should return error", func() {
+ tenant, dbError := getTenantFromDB("wrongid")
+ Expect(tenant.Org).To(Equal(""))
+ Expect(dbError.ErrorCode).To(Equal("UNKNOWN_SCOPE"))
+ })
+ })
+
+})
+
var _ = Describe("test getDeveloperInfo()", func() {
Context("with usecaching set to true", func() {
+ BeforeEach(func() {
+ config.Set(useCaching, true)
+ updateDeveloperInfoCache()
+ })
+ AfterEach(func() {
+ config.Set(useCaching, false)
+ })
Context("get developerInfo for valid tenantId and apikey", func() {
It("should return all right data", func() {
- developerInfo := getDeveloperInfo("tenantid", "testapikey")
- Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
- Expect(developerInfo.Developer).To(Equal("testdeveloper"))
+ key := getKeyForDeveloperInfoCache("tenantid", "testapikey")
+ _, e := developerInfoCache[key]
+ Expect(e).To(BeFalse())
+
+ getDeveloperInfo("tenantid", "testapikey")
+ devInfo, e := developerInfoCache[key]
+ Expect(e).To(BeTrue())
+ Expect(devInfo.ApiProduct).To(Equal("testproduct"))
+ Expect(devInfo.Developer).To(Equal("testdeveloper"))
})
})
Context("get developerInfo for invalid tenantId and apikey", func() {
- It("should return all right data", func() {
+ It("should return all empty", func() {
developerInfo := getDeveloperInfo("wrongid", "wrongapikey")
Expect(developerInfo.ApiProduct).To(Equal(""))
})
@@ -69,12 +105,6 @@
})
Context("with usecaching set to false", func() {
- BeforeEach(func() {
- config.Set(useCaching, false)
- })
- AfterEach(func() {
- config.Set(useCaching, true)
- })
Context("get developerInfo for valid tenantId and apikey", func() {
It("should return all right data", func() {
developerInfo := getDeveloperInfo("tenantid", "testapikey")
@@ -91,6 +121,25 @@
})
})
+var _ = Describe("test getDevInfoFromDB()", func() {
+ Context("get developerInfo for valid tenantId and apikey", func() {
+ It("should return all right data", func() {
+ developerInfo, err := getDevInfoFromDB("tenantid", "testapikey")
+ Expect(err).ToNot(HaveOccurred())
+ Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
+ Expect(developerInfo.Developer).To(Equal("testdeveloper"))
+ })
+ })
+ Context("get developerInfo for invalid tenantId and apikey", func() {
+ It("should return all empty data", func() {
+ developerInfo, err := getDevInfoFromDB("wrongid", "wrongapikey")
+ Expect(err).To(HaveOccurred())
+ Expect(developerInfo.ApiProduct).To(Equal(""))
+ })
+ })
+
+})
+
var _ = Describe("test getValuesIgnoringNull()", func() {
Context("Null sql value", func() {
It("should return empty string", func() {
@@ -107,3 +156,23 @@
})
})
})
+
+func getDatascopeSnapshot() common.Snapshot {
+ event := common.Snapshot{
+ SnapshotInfo: "test_snapshot_valid",
+ Tables: []common.Table{
+ {
+ Name: LISTENER_TABLE_DATA_SCOPE,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "testid"},
+ "scope": &common.ColumnVal{Value: "tenantid"},
+ "org": &common.ColumnVal{Value: "testorg"},
+ "env": &common.ColumnVal{Value: "testenv"},
+ },
+ },
+ },
+ },
+ }
+ return event
+}
diff --git a/crash_recovery_test.go b/crash_recovery_test.go
new file mode 100644
index 0000000..68bb5e6
--- /dev/null
+++ b/crash_recovery_test.go
@@ -0,0 +1,166 @@
+package apidAnalytics
+
+import (
+ "compress/gzip"
+ "encoding/json"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+var _ = Describe("test crashRecoveryNeeded(), ", func() {
+ Context("directories in recovered folder", func() {
+ It("should return true", func() {
+ dirName := "t~e~20160108536000~recoveredTS~20160101222612.123"
+ dirPath := filepath.Join(localAnalyticsRecoveredDir, dirName)
+
+ err := os.Mkdir(dirPath, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ needed := crashRecoveryNeeded()
+ Expect(needed).To(BeTrue())
+
+ err = os.Remove(dirPath)
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+ })
+ Context("directories in tmp folder", func() {
+ It("should return true", func() {
+ d := "t~e~20160112630000"
+ dirPath := filepath.Join(localAnalyticsTempDir, d)
+ fp := filepath.Join(dirPath, "fakefile.txt.gz")
+ err := os.Mkdir(dirPath, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ _, err = os.Create(fp)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ needed := crashRecoveryNeeded()
+ Expect(needed).To(BeTrue())
+
+ // moves file to recovered dir
+ dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
+ for _, dir := range dirs {
+ if strings.Contains(dir.Name(), d) {
+ Expect(dir.Name()).To(
+ ContainSubstring(d + recoveredTS))
+ err = os.RemoveAll(localAnalyticsRecoveredDir +
+ "/" + dir.Name())
+ Expect(err).ShouldNot(HaveOccurred())
+ }
+ }
+ })
+ })
+})
+
+var _ = Describe("test performRecovery(), ", func() {
+ It("should move all recovered directories to staging", func() {
+ dirName := "t~e~20160101545000~recoveredTS~20160101222612.123"
+ dirPath := filepath.Join(localAnalyticsRecoveredDir, dirName)
+ err := os.Mkdir(dirPath, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ performRecovery()
+ newPath := filepath.Join(localAnalyticsStagingDir, dirName)
+ Expect(newPath).To(BeADirectory())
+
+ err = os.Remove(newPath)
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+})
+
+var _ = Describe("test recoverDirectory(), ", func() {
+ It("should recover file and move folder to staging", func() {
+ dirName := "t~e~20160101535000~recoveredTS~20160101222612.123"
+ dirPath := filepath.Join(localAnalyticsRecoveredDir, dirName)
+ fp := filepath.Join(dirPath, "fakefile.txt.gz")
+ err := os.Mkdir(dirPath, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ recoveredFile, err := os.OpenFile(fp,
+ os.O_WRONLY|os.O_CREATE, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ gw := gzip.NewWriter(recoveredFile)
+
+ // write some content to file
+ var records = []byte(`{
+ "response_status_code": 200,
+ "client_id":"testapikey",
+ "client_received_start_timestamp": 1486406248277,
+ "client_received_end_timestamp": 1486406248290
+ }`)
+ gw.Write(records)
+ gw.Close()
+ recoveredFile.Close()
+
+ recoverDirectory(dirName)
+
+ stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
+ Expect(dirPath).ToNot(BeADirectory())
+ Expect(stagingPath).To(BeADirectory())
+
+ err = os.RemoveAll(stagingPath)
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+})
+
+var _ = Describe("test recoverFile(), ", func() {
+ It("should create a recovered file and delete parital file", func() {
+ dirName := "t~e~20160101530000~recoveredTS~20160101222612.123"
+ dirPath := filepath.Join(localAnalyticsRecoveredDir, dirName)
+ fp := filepath.Join(dirPath, "fakefile.txt.gz")
+ err := os.Mkdir(dirPath, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ recoveredFile, err := os.OpenFile(fp,
+ os.O_WRONLY|os.O_CREATE, os.ModePerm)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ gw := gzip.NewWriter(recoveredFile)
+
+ // write some content to file
+ var records = []byte(`{
+ "response_status_code": 200,
+ "client_id":"testapikey",
+ "client_received_start_timestamp": 1486406248277,
+ "client_received_end_timestamp": 1486406248290
+ }`)
+ gw.Write(records)
+ gw.Close()
+ recoveredFile.Close()
+
+ recoverFile("_20160101222612.123", dirName, "fakefile.txt.gz")
+
+ recoveredFileName := "fakefile_recovered_20160101222612.123.txt.gz"
+ recoveredFilePath := filepath.Join(dirPath, recoveredFileName)
+ Expect(recoveredFilePath).To(BeAnExistingFile())
+ Expect(fp).ToNot(BeAnExistingFile())
+
+ // Verify file was written to properly
+ f, err := os.Open(recoveredFilePath)
+ defer f.Close()
+ gzReader, err := gzip.NewReader(f)
+ defer gzReader.Close()
+ Expect(err).ToNot((HaveOccurred()))
+
+ var record map[string]interface{}
+ decoder := json.NewDecoder(gzReader) // Decode payload to JSON data
+ decoder.UseNumber()
+ err = decoder.Decode(&record)
+ Expect(err).ToNot((HaveOccurred()))
+
+ Expect(record["client_id"]).To(Equal("testapikey"))
+ Expect(record["response_status_code"]).To(Equal(json.Number("200")))
+ Expect(record["client_received_start_timestamp"]).
+ To(Equal(json.Number("1486406248277")))
+ Expect(record["client_received_end_timestamp"]).
+ To(Equal(json.Number("1486406248290")))
+
+ err = os.RemoveAll(dirPath)
+ Expect(err).ShouldNot(HaveOccurred())
+ })
+})
diff --git a/init.go b/init.go
index 451a1f4..0706e57 100644
--- a/init.go
+++ b/init.go
@@ -6,6 +6,7 @@
"os"
"path/filepath"
"sync"
+ "time"
)
const (
@@ -29,7 +30,7 @@
// Number of slots for internal channel buffering of
// analytics records before they are dumped to a file
analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
- analyticsBufferChannelSizeDefault = 100
+ analyticsBufferChannelSizeDefault = 1000
// EdgeX endpoint base path to access Uap Collection Endpoint
uapServerBase = "apidanalytics_uap_server_base"
@@ -38,7 +39,11 @@
// info will be maintained in-memory
// cache to avoid DB calls for each analytics message
useCaching = "apidanalytics_use_caching"
- useCachingDefault = true
+ useCachingDefault = false
+
+ // Interval in seconds when the developer cache should be refreshed
+ analyticsCacheRefreshInterval = "apidanalytics_cache_refresh_interval"
+ analyticsCacheRefreshIntervaleDefault = 1800
)
// keep track of the services that this plugin will use
@@ -122,6 +127,22 @@
// for new messages and dump them to files
initBufferingManager()
+ // Initialize developerInfo cache invalidation periodically
+ if config.GetBool(useCaching) {
+ updateDeveloperInfoCache()
+ go func() {
+ ticker := time.NewTicker(time.Second *
+ config.GetDuration(analyticsCacheRefreshInterval))
+ // Ticker will keep running till go routine is running
+ // i.e. till application is running
+ defer ticker.Stop()
+
+ for range ticker.C {
+ updateDeveloperInfoCache()
+ }
+ }()
+ }
+
// Initialize API's and expose them
initAPI(services)
log.Debug("end init for apidAnalytics plugin")
@@ -153,6 +174,9 @@
// set default config for useCaching
config.SetDefault(useCaching, useCachingDefault)
+ // set default config for cache refresh interval
+ config.SetDefault(analyticsCacheRefreshInterval, analyticsCacheRefreshIntervaleDefault)
+
// set default config for upload interval
config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault)
diff --git a/listener.go b/listener.go
index ba5e15a..872ad80 100644
--- a/listener.go
+++ b/listener.go
@@ -37,21 +37,11 @@
setDB(db)
if config.GetBool(useCaching) {
- err = createTenantCache()
- if err != nil {
- log.Error(err)
- } else {
- log.Debug("Created a local cache" +
- " for datasope information")
- }
- err = createDeveloperInfoCache()
- if err != nil {
- log.Error(err)
- } else {
- log.Debug("Created a local cache for developer information")
- }
+ createTenantCache(snapshot)
+ log.Debug("Created a local cache" +
+ " for datasope information")
} else {
- log.Info("Will not be caching any developer info " +
+ log.Info("Will not be caching any developer or tenant info " +
"and make a DB call for every analytics msg")
}
return
@@ -62,7 +52,6 @@
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
var rows []common.Row
- refreshDevInfoNeeded := false
for _, payload := range changes.Changes {
rows = nil
switch payload.Table {
@@ -81,13 +70,15 @@
ele.Get("scope", &tenantid)
ele.Get("org", &org)
ele.Get("env", &env)
- tenantCache[scopeuuid] = tenant{
- Org: org,
- Env: env,
- TenantId: tenantid}
- log.Debugf("Refreshed local "+
- "tenantCache. Added "+
- "scope: "+"%s", scopeuuid)
+ if scopeuuid != "" {
+ tenantCache[scopeuuid] = tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantid}
+ log.Debugf("Refreshed local "+
+ "tenantCache. Added "+
+ "scope: "+"%s", scopeuuid)
+ }
}
case common.Delete:
rows = append(rows, payload.OldRow)
@@ -98,23 +89,16 @@
for _, ele := range rows {
var scopeuuid string
ele.Get("id", &scopeuuid)
- delete(tenantCache, scopeuuid)
- log.Debugf("Refreshed local"+
- " tenantCache. Deleted"+
- " scope: %s", scopeuuid)
+ if scopeuuid != "" {
+ delete(tenantCache, scopeuuid)
+ log.Debugf("Refreshed local"+
+ " tenantCache. Deleted"+
+ " scope: %s", scopeuuid)
+ }
}
}
- case "kms.developer", "kms.app", "kms.api_product",
- "kms.app_credential_apiproduct_mapper":
- // any change in any of the above tables
- // should result in cache refresh
- refreshDevInfoNeeded = true
}
}
- // Refresh cache once for all set of changes
- if refreshDevInfoNeeded {
- createDeveloperInfoCache()
- log.Debug("Refresh local developerInfoCache")
- }
+
}
}
diff --git a/listener_test.go b/listener_test.go
index b792133..6a027f5 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -4,19 +4,86 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
const (
- LISTENER_TABLE_APP_CRED_MAPPER = "kms.app_credential_apiproduct_mapper"
- LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
+ LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
-var _ = Describe("listener", func() {
+var _ = Describe("ApigeeSync event", func() {
+
+ var db apid.DB
+ handler := handler{}
+
+ BeforeEach(func() {
+ db = getDB()
+
+ config.Set(useCaching, true)
+
+ snapshot := getDatascopeSnapshot()
+ createTenantCache(&snapshot)
+ Expect(len(tenantCache)).To(Equal(1))
+ })
+
+ AfterEach(func() {
+ config.Set(useCaching, false)
+ setDB(db)
+ })
+
+ Context("ApigeeSync snapshot event", func() {
+ It("should set DB to appropriate version", func() {
+ config.Set(useCaching, false)
+
+ event := common.Snapshot{
+ SnapshotInfo: "test_snapshot",
+ Tables: []common.Table{},
+ }
+
+ handler.Handle(&event)
+
+ expectedDB, err := data.DBVersion(event.SnapshotInfo)
+ Expect(err).NotTo(HaveOccurred())
+
+ Expect(getDB() == expectedDB).Should(BeTrue())
+ })
+
+ It("should process a valid Snapshot", func() {
+ event := common.Snapshot{
+ SnapshotInfo: "test_snapshot_valid",
+ Tables: []common.Table{
+ {
+ Name: LISTENER_TABLE_DATA_SCOPE,
+ Rows: []common.Row{
+ {
+ "id": &common.ColumnVal{Value: "i"},
+ "_change_selector": &common.ColumnVal{Value: "c"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ },
+ },
+ },
+ },
+ }
+
+ handler.Handle(&event)
+ tenant := tenantCache["i"]
+ Expect(tenant.TenantId).To(Equal("s"))
+ Expect(tenant.Org).To(Equal("o"))
+ Expect(tenant.Env).To(Equal("e"))
+ })
+ })
+
Context("Process changeList", func() {
Context(LISTENER_TABLE_DATA_SCOPE, func() {
It("insert/delete event should add/remove to/from cache if usecaching is true", func() {
- config.Set(useCaching, true)
txn, err := getDB().Begin()
Expect(err).ShouldNot(HaveOccurred())
txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+
@@ -53,7 +120,7 @@
},
}
- processChange(&insert)
+ handler.Handle(&insert)
tenant := tenantCache["i2"]
Expect(tenant.TenantId).To(Equal("s2"))
Expect(tenant.Org).To(Equal("o2"))
@@ -75,65 +142,9 @@
},
}
- processChange(&delete)
+ handler.Handle(&delete)
_, exists := tenantCache["i2"]
- Expect(exists).To(Equal(false))
-
- })
- })
- Context(LISTENER_TABLE_APP_CRED_MAPPER, func() {
- It("insert/delete event should refresh developer cache if usecaching is true", func() {
- config.Set(useCaching, true)
-
- txn, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+
- "VALUES"+
- "($1,$2,$3,$4,$5,$6)",
- "tenantid",
- "aci",
- "ai",
- "testproductid",
- "APPROVED",
- "12345",
- )
-
- txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+
- "VALUES"+
- "($1,$2,$3,$4)",
- "ai",
- "tenantid",
- "name",
- "testdeveloperid",
- )
- txn.Commit()
-
- insert := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_APP_CRED_MAPPER,
- NewRow: common.Row{
- "tenant_id": &common.ColumnVal{Value: "tenantid"},
- "appcred_id": &common.ColumnVal{Value: "aci"},
- "app_id": &common.ColumnVal{Value: "ai"},
- "apiprdt_id": &common.ColumnVal{Value: "api"},
- "status": &common.ColumnVal{Value: "s"},
- "_change_selector": &common.ColumnVal{Value: "c"},
- },
- },
- },
- }
-
- processChange(&insert)
- key := getKeyForDeveloperInfoCache("tenantid", "aci")
- developerInfo := developerInfoCache[key]
-
- Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
- Expect(developerInfo.Developer).To(Equal("testdeveloper"))
- Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com"))
- Expect(developerInfo.DeveloperApp).To(Equal("name"))
+ Expect(exists).To(BeFalse())
})
})
})
diff --git a/upload_manager_test.go b/upload_manager_test.go
index e0033e9..d8a9089 100644
--- a/upload_manager_test.go
+++ b/upload_manager_test.go
@@ -22,8 +22,8 @@
Expect(e).ShouldNot(HaveOccurred())
handleUploadDirStatus(info, true)
- status, _ := exists(dirPath)
- Expect(status).To(BeFalse())
+ Expect(dirPath).ToNot(BeADirectory())
+
_, exists := retriesMap[dirName]
Expect(exists).To(BeFalse())
})
@@ -43,8 +43,7 @@
for i := 1; i < maxRetries; i++ {
handleUploadDirStatus(info, false)
- status, _ := exists(dirPath)
- Expect(status).To(BeTrue())
+ Expect(dirPath).To(BeAnExistingFile())
cnt, exists := retriesMap[dirName]
Expect(exists).To(BeTrue())
@@ -55,9 +54,7 @@
handleUploadDirStatus(info, false)
failedPath := filepath.Join(localAnalyticsFailedDir, dirName)
-
- status, _ := exists(failedPath)
- Expect(status).To(BeTrue())
+ Expect(failedPath).To(BeADirectory())
_, exists := retriesMap[dirName]
Expect(exists).To(BeFalse())
@@ -79,11 +76,8 @@
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
// move from failed to staging directory
- status, _ := exists(dirPath)
- Expect(status).To(BeFalse())
-
- status, _ = exists(stagingPath)
- Expect(status).To(BeTrue())
+ Expect(dirPath).ToNot(BeADirectory())
+ Expect(stagingPath).To(BeADirectory())
})
It("if multiple folders, then move only configured batch at a time", func() {
for i := 1; i < (retryFailedDirBatchSize * 2); i++ {
diff --git a/uploader_test.go b/uploader_test.go
index b4a6d84..e9af4a1 100644
--- a/uploader_test.go
+++ b/uploader_test.go
@@ -44,9 +44,7 @@
status := uploadDir(dir)
Expect(status).To(BeTrue())
-
- e, _ := exists(fp)
- Expect(e).To(BeFalse())
+ Expect(fp).ToNot(BeAnExistingFile())
})
})
Context("invalid tenant", func() {
@@ -60,9 +58,7 @@
status := uploadDir(dir)
Expect(status).To(BeFalse())
-
- e, _ := exists(fp)
- Expect(e).To(BeTrue())
+ Expect(fp).To(BeAnExistingFile())
})
})
})
@@ -86,7 +82,6 @@
url, err := getSignedUrl(tenant, relativeFilePath)
Expect(err).ShouldNot(HaveOccurred())
Expect(url).ShouldNot(Equal(""))
- log.Debugf(url)
})
})
})