Added comments, improved loggin and refactored some parts
diff --git a/api.go b/api.go
index a35bea5..c2edcb5 100644
--- a/api.go
+++ b/api.go
@@ -18,12 +18,6 @@
Reason string `json:"reason"`
}
-type tenant struct {
- Org string
- Env string
- TenantId string
-}
-
func initAPI(services apid.Services) {
log.Debug("initialized API's exposed by apidAnalytics plugin")
analyticsBasePath = config.GetString(configAnalyticsBasePath)
@@ -34,13 +28,7 @@
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
- db, _ := data.DB() // When database isnt initialized
- if db == nil {
- writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely")
- return
- }
-
- db = getDB() // When snapshot isnt processed
+ db := getDB() // When database isnt initialized
if db == nil {
writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely")
return
diff --git a/api.yaml b/api.yaml
index 782de56..9b9ccfa 100644
--- a/api.yaml
+++ b/api.yaml
@@ -106,6 +106,12 @@
properties:
errrorCode:
type: string
+ enum:
+ - UNKNOWN_SCOPE
+ - BAD_DATA
+ - UNSUPPORTED_CONTENT_TYPE
+ - UNSUPPORTED_CONTENT_ENCODING
+ - MISSING_FIELD
reason:
type: string
example: {
@@ -120,6 +126,9 @@
properties:
errrorCode:
type: string
+ enum:
+ - INTERNAL_SERVER_ERROR
+ - INTERNAL_SEARCH_ERROR
reason:
type: string
example: {
diff --git a/api_helper.go b/api_helper.go
index 214578d..f12605f 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -9,6 +9,10 @@
)
+/*
+Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel
+*/
+
type developerInfo struct {
ApiProduct string
DeveloperApp string
@@ -18,14 +22,20 @@
type axRecords struct {
Tenant tenant
- Records []interface{}
+ Records []interface{} // Records is an array of multiple analytics records
+}
+
+type tenant struct {
+ Org string
+ Env string
+ TenantId string
}
func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") {
- return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"}
+ return errResponse{ErrorCode:"UNSUPPORTED_CONTENT_ENCODING", Reason:"Only supported content encoding is gzip"}
} else {
gzipEncoded = true
}
@@ -36,7 +46,7 @@
if gzipEncoded {
reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
- return errResponse{"BAD_DATA", "Gzip data cannot be read"}
+ return errResponse{ErrorCode:"BAD_DATA", Reason:"Gzip Encoded data cannot be read"}
}
} else {
reader = r.Body
@@ -51,14 +61,15 @@
func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse {
var raw map[string]interface{}
- dec := json.NewDecoder(reader)
- dec.UseNumber()
+ decoder := json.NewDecoder(reader) // Decode payload to JSON data
+ decoder.UseNumber()
- if err := dec.Decode(&raw); err != nil {
- return errResponse{"BAD_DATA", "Not a valid JSON payload"}
+ if err := decoder.Decode(&raw); err != nil {
+ return errResponse{ErrorCode:"BAD_DATA", Reason:"Not a valid JSON payload"}
}
if records := raw["records"]; records != nil {
+ // Iterate through each record to validate and enrich it
for _, eachRecord := range records.([]interface{}) {
recordMap := eachRecord.(map[string]interface{})
valid, err := validate(recordMap)
@@ -68,20 +79,25 @@
return err // Even if there is one bad record, then reject entire batch
}
}
- // publish batch of records to channel (blocking call)
axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})}
+ // publish batch of records to channel (blocking call)
internalBuffer <- axRecords
} else {
- return errResponse{"NO_RECORDS", "No analytics records in the payload"}
+ return errResponse{ErrorCode:"NO_RECORDS", Reason:"No analytics records in the payload"}
}
return errResponse{}
}
+/*
+Does basic validation on each analytics message
+1. client_received_start_timestamp should exist
+2. if client_received_end_timestamp exists then it should be > client_received_start_timestamp
+*/
func validate(recordMap map[string]interface{}) (bool, errResponse) {
elems := []string{"client_received_start_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
- return false, errResponse{"MISSING_FIELD", "Missing field: " + elem}
+ return false, errResponse{ErrorCode:"MISSING_FIELD", Reason:"Missing Required field: " + elem}
}
}
@@ -89,12 +105,16 @@
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
if crst.(json.Number) > cret.(json.Number) {
- return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"}
+ return false, errResponse{ErrorCode:"BAD_DATA", Reason:"client_received_start_timestamp > client_received_end_timestamp"}
}
}
return true, errResponse{}
}
+/*
+Enrich each record by adding org and env fields
+It also finds add developer related information based on the apiKey
+*/
func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) {
org, orgExists := recordMap["organization"]
if !orgExists || org.(string) == "" {
diff --git a/api_test.go b/api_test.go
index 1c9ac2f..f56a463 100644
--- a/api_test.go
+++ b/api_test.go
@@ -11,11 +11,8 @@
// BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go
var _ = Describe("testing saveAnalyticsRecord() directly", func() {
-
Context("valid scopeuuid", func() {
-
It("should successfully return", func() {
-
uri, err := url.Parse(testServer.URL)
uri.Path = analyticsBasePath
@@ -32,7 +29,6 @@
})
Context("invalid scopeuuid", func() {
-
It("should return bad request", func() {
uri, err := url.Parse(testServer.URL)
uri.Path = analyticsBasePath
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index 347d694..2e98e86 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -34,7 +34,8 @@
config.Set("data_path", testTempDir)
config.Set(uapServerBase, "http://localhost:9000") // dummy value
- config.Set(useCaching, false)
+ config.Set("apigeesync_apid_instance_id","abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
+ config.Set(useCaching, true)
db, err := apid.Data().DB()
Expect(err).NotTo(HaveOccurred())
@@ -44,6 +45,10 @@
insertTestData(db)
apid.InitializePlugins()
+ // Create cache else its created in listener.go when a snapshot is received
+ createTenantCache()
+ createDeveloperInfoCache()
+
testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == analyticsBasePathDefault {
saveAnalyticsRecord(w, req)
diff --git a/buffering_manager.go b/buffering_manager.go
index 243409d..b9fa3d2 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -11,13 +11,18 @@
"encoding/json"
)
+const fileExtension = ".txt.gz";
+
+// Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
+// Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval
var closeBucketEvent chan bucket
+// Map from timestampt to bucket
var bucketMap map[int64]bucket
type bucket struct {
DirName string
- // We need file handle, writter pointer to close the file
+ // We need file handle and writer to close the file
FileWriter fileWriter
}
@@ -39,7 +44,7 @@
records := <-internalBuffer
err := save(records)
if err != nil {
- log.Errorf("Could not save %d messages to file. %v", len(records.Records), err)
+ log.Errorf("Could not save %d messages to file due to: %v", len(records.Records), err)
}
}
}()
@@ -48,21 +53,23 @@
go func() {
for {
bucket := <- closeBucketEvent
- log.Debugf("Closing bucket %s", bucket.DirName)
+ log.Debugf("Close Event received for bucket: %s", bucket.DirName)
// close open file
closeGzipFile(bucket.FileWriter)
dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
+ // close files in tmp folder and move directory to staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory :%s to staging folder", bucket.DirName)
+ log.Errorf("Cannot move directory '%s' from tmp to staging folder", bucket.DirName)
}
}
}()
}
+// Save records to correct file based on what timestamp data is being collected for
func save(records axRecords) (error) {
bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
if (err != nil ) {
@@ -74,7 +81,7 @@
func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
- // first based on current timestamp, determine the timestamp bucket
+ // first based on current timestamp and collection interval, determine the timestamp of the bucket
ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
_, exists := bucketMap[ts]
if exists {
@@ -82,6 +89,7 @@
} else {
timestamp := time.Unix(ts, 0).Format(timestampLayout)
+ // endtimestamp of bucket = starttimestamp + collectionInterval
endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0)
endtimestamp := endTime.Format(timestampLayout)
@@ -90,11 +98,12 @@
// create dir
err := os.Mkdir(newPath, os.ModePerm)
if err != nil {
- return bucket{}, fmt.Errorf("Cannot create directory : %s to buffer messages due to %v:", dirName, err)
+ return bucket{}, fmt.Errorf("Cannot create directory '%s' to buffer messages '%v'", dirName, err)
}
// create file for writing
- fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0.txt.gz"
+ // Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz
+ fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0" + fileExtension
completeFilePath := filepath.Join(newPath, fileName)
fw, err := createGzipFile(completeFilePath)
if err != nil {
@@ -104,7 +113,7 @@
newBucket := bucket{DirName: dirName, FileWriter: fw}
bucketMap[ts] = newBucket
- //Send event to close directory after endTime
+ //Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file
timer := time.After(endTime.Sub(time.Now()) + time.Second * 5)
go func() {
<- timer
@@ -114,6 +123,7 @@
}
}
+// 4 digit Hex is prefixed to each filename to improve how s3 partitions the files being uploaded
func getRandomHex() string {
buff := make([]byte, 2)
rand.Read(buff)
@@ -123,7 +133,7 @@
func createGzipFile(s string) (fileWriter, error) {
file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
- return fileWriter{},fmt.Errorf("Cannot create file : %s to buffer messages due to: %v", s, err)
+ return fileWriter{},fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err)
}
gw := gzip.NewWriter(file)
bw := bufio.NewWriter(gw)
@@ -131,21 +141,22 @@
}
func writeGzipFile(fw fileWriter, records []interface{}) {
+ // write each record as a new line to the bufferedWriter
for _, eachRecord := range records {
s, _ := json.Marshal(eachRecord)
_, err := (fw.bw).WriteString(string(s))
if err != nil {
- log.Errorf("Write to file failed due to: %v", err)
+ log.Errorf("Write to file failed '%v'", err)
}
(fw.bw).WriteString("\n")
}
+ // Flush entire batch of records to file vs each message
fw.bw.Flush()
fw.gw.Flush()
}
func closeGzipFile(fw fileWriter) {
fw.bw.Flush()
- // Close the gzip first.
fw.gw.Close()
fw.file.Close()
}
diff --git a/cmd/apidAnalytics/main.go b/cmd/apidAnalytics/main.go
deleted file mode 100644
index 10689f1..0000000
--- a/cmd/apidAnalytics/main.go
+++ /dev/null
@@ -1,29 +0,0 @@
-package main
-
-import (
- "github.com/30x/apid"
- "github.com/30x/apid/factory"
-
- _ "github.com/30x/apidAnalytics"
-)
-
-func main() {
- // initialize apid using default services
- apid.Initialize(factory.DefaultServicesFactory())
-
- log := apid.Log()
-
- // this will call all initialization functions on all registered plugins
- apid.InitializePlugins()
-
- // print the base url to the console
- config := apid.Config()
- basePath := config.GetString("analyticsBasePath")
- port := config.GetString("api_port")
- log.Printf("Analytics API is at: http://localhost:%s%s", port, basePath)
-
- // start client API listener
- api := apid.API()
- err := api.Listen() // doesn't return if no error
- log.Fatalf("Error. Is something already running on port %d? %s", port, err)
-}
diff --git a/common_helper.go b/common_helper.go
index b0fe3a1..89b37bc 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -6,21 +6,25 @@
"sync"
)
+// Cache for scope uuid to org, env and tenantId information
var tenantCache map[string]tenant
-var developerInfoCache map[string]developerInfo
+// RW lock for tenant map cache since the cache can be read while its being written to and vice versa
var tenantCachelock = sync.RWMutex{}
+// Cache for apiKey~tenantId to developer related information
+var developerInfoCache map[string]developerInfo
+// RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa
var developerInfoCacheLock = sync.RWMutex{}
-
+// Load data scope information into an in-memory cache so that for each record a DB lookup is not required
func createTenantCache() error {
tenantCache = make(map[string]tenant)
var org, env, tenantId, id string
- db := getDB()
+ db := getDB()
rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE")
if error != nil {
- return fmt.Errorf("Count not get datascope from DB due to : %s", error.Error())
+ return fmt.Errorf("Count not get datascope from DB due to: %v", error)
} else {
defer rows.Close()
// Lock before writing to the map as it has multiple readers
@@ -31,27 +35,27 @@
tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId}
}
}
+
log.Debugf("Count of data scopes in the cache: %d", len(tenantCache))
return nil
}
+// Load data scope information into an in-memory cache so that for each record a DB lookup is not required
func createDeveloperInfoCache() error {
developerInfoCache = make(map[string]developerInfo)
-
var apiProduct, developerApp, developerEmail, developer sql.NullString
var tenantId, apiKey string
db := getDB()
-
sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name, a.name, d.username, d.email " +
"FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
"INNER JOIN APP AS a ON a.id = mp.app_id " +
- "INNER JOIN DEVELOPER as d ON d.id = a.developer_id ;"
+ "INNER JOIN DEVELOPER as d ON d.id = a.developer_id;"
rows, error := db.Query(sSql)
if error != nil {
- return fmt.Errorf("Count not get developerInfo from DB due to : %s", error.Error())
+ return fmt.Errorf("Count not get developerInfo from DB due to: %v", error)
} else {
defer rows.Close()
// Lock before writing to the map as it has multiple readers
@@ -69,17 +73,21 @@
developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
}
}
+
log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache))
return nil
}
+// Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config
func getTenantForScope(scopeuuid string) (tenant, dbError) {
if (config.GetBool(useCaching)) {
_, exists := tenantCache[scopeuuid]
if !exists {
reason := "No tenant found for this scopeuuid: " + scopeuuid
errorCode := "UNKNOWN_SCOPE"
- return tenant{}, dbError{errorCode, reason}
+ // Incase of unknown scope, try to refresh the cache ansynchronously incase an update was missed or delayed
+ go createTenantCache()
+ return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
} else {
// acquire a read lock as this cache has 1 writer as well
tenantCachelock.RLock()
@@ -88,38 +96,33 @@
}
} else {
var org, env, tenantId string
- db, err := data.DB()
- if err != nil {
- reason := "DB not initialized"
- errorCode := "INTERNAL_SEARCH_ERROR"
- return tenant{}, dbError{errorCode, reason}
- }
+ db := getDB()
error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
switch {
case error == sql.ErrNoRows:
reason := "No tenant found for this scopeuuid: " + scopeuuid
errorCode := "UNKNOWN_SCOPE"
- return tenant{}, dbError{errorCode, reason}
+ return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
case error != nil:
reason := error.Error()
errorCode := "INTERNAL_SEARCH_ERROR"
- return tenant{}, dbError{errorCode, reason}
+ return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
}
-
return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{}
}
- // TODO: localTesting
- //return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{}
}
+// Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config
func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
if (config.GetBool(useCaching)) {
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
_, exists := developerInfoCache[keyForMap]
if !exists {
- log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
+ log.Warnf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
+ // Incase of unknown apiKey~tenantId, try to refresh the cache ansynchronously incase an update was missed or delayed
+ go createTenantCache()
return developerInfo{}
} else {
// acquire a read lock as this cache has 1 writer as well
@@ -137,14 +140,14 @@
"INNER JOIN APP AS a ON a.id = mp.app_id " +
"INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
"where mp.tenant_id = ? and mp.appcred_id = ?;"
- error := db.QueryRow(sSql,tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
+ error := db.QueryRow(sSql, tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
switch {
case error == sql.ErrNoRows:
- log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey)
+ log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
return developerInfo{}
case error != nil:
- log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey + " due to " + error.Error())
+ log.Debugf("No data found for for tenantId = %s and apiKey = %s due to: %v", tenantId, apiKey, error)
return developerInfo{}
}
@@ -152,14 +155,11 @@
devApp := getValuesIgnoringNull(developerApp)
dev := getValuesIgnoringNull(developer)
devEmail := getValuesIgnoringNull(developerEmail)
-
return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
}
- // TODO: localTesting
- // return developerInfo{ApiProduct: "testproduct", DeveloperApp: "testapp", DeveloperEmail: "testdeveloper@test.com", Developer: "testdeveloper"}
-
}
+// Helper method to handle scanning null values in DB to empty string
func getValuesIgnoringNull(sqlValue sql.NullString) string {
if sqlValue.Valid {
return sqlValue.String
@@ -168,6 +168,7 @@
}
}
+// Build Key as a combination of tenantId and apiKey for the developerInfo Cache
func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string {
return tenantId + "~" + apiKey
}
diff --git a/crash_recovery.go b/crash_recovery.go
index f6793fd..7811d9c 100644
--- a/crash_recovery.go
+++ b/crash_recovery.go
@@ -10,16 +10,17 @@
"compress/gzip"
)
-const crashRecoveryDelay = 30 // seconds
-const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format
-const fileExtension = ".txt.gz";
-const recoveredTS = "~recoveredTS~"
-
+const (
+ crashRecoveryDelay = 30 // seconds
+ recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
+ recoveredTS = "~recoveredTS~" // Constant to identify recovered files
+)
func initCrashRecovery() {
if crashRecoveryNeeded() {
timer := time.After(time.Second * crashRecoveryDelay)
+ // Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up
go func() {
<- timer
performRecovery()
@@ -27,6 +28,7 @@
}
}
+// Crash recovery is needed if there are any folders in tmp or recovered directory
func crashRecoveryNeeded() (bool) {
recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
@@ -37,30 +39,34 @@
return needed
}
+// If Apid is shutdown or crashes while a file is still open in tmp folder, then the file has partial data.
+// This partial data can be recoverd.
func recoverFoldersInTmpDir() bool {
tmpRecoveryNeeded := false
dirs,_ := ioutil.ReadDir(localAnalyticsTempDir)
recoveryTS := getRecoveryTS()
for _, dir := range dirs {
tmpRecoveryNeeded = true
- log.Debugf("Moving directory %s from tmp to recovered ", dir.Name())
+ log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name())
tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
-
- newDirName := dir.Name() + recoveredTS + recoveryTS;
+ newDirName := dir.Name() + recoveredTS + recoveryTS; // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName)
err := os.Rename(tmpCompletePath, recoveredCompletePath)
if err != nil {
- log.Errorf("Cannot move directory :%s to recovered folder", dir.Name())
+ log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name())
}
}
return tmpRecoveryNeeded
}
+// Get Timestamp for when the recovery is being attempted on the folder.
func getRecoveryTS() string {
current := time.Now()
- return current.Format(recovertTSLayout)
+ return current.Format(recoveryTSLayout)
}
+// If APID is restarted twice immediately such that files have been moved to recovered folder but actual recovery has'nt started or is partially done
+// Then the files will just stay in the recovered dir and need to be recovered again.
func recoverFolderInRecoveredDir() bool {
dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
if len(dirs) > 0 {
@@ -83,6 +89,7 @@
var bucketRecoveryTS string
// Parse bucket name to extract recoveryTS and pass it each file to be recovered
+ // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 -> bucketRecoveryTS = _20160101222612.123
index := strings.Index(dirName, recoveredTS)
if index != -1 {
bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
@@ -98,7 +105,7 @@
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
err := os.Rename(dirBeingRecovered, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory :%s to staging folder", dirName)
+ log.Errorf("Cannot move directory '%s' from recovered to staging folder", dirName)
}
}
@@ -106,16 +113,19 @@
log.Debugf("performing crash recovery for file: %s ", fileName)
// add recovery timestamp to the file name
completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName)
+
recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension
recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension
+ // eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt
recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
+
+ // Copy complete records to new file and delete original partial file
copyPartialFile(completeOrigFilePath, recoveredFilePath);
deletePartialFile(completeOrigFilePath);
}
+// The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file.
func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
-
- // read partial file line by line using buffered gzip reader
partialFile, err := os.Open(completeOrigFilePath)
if err != nil {
log.Errorf("Cannot open file: %s", completeOrigFilePath)
@@ -152,7 +162,7 @@
bufWriter.WriteString("\n")
}
if err := scanner.Err(); err != nil {
- log.Errorf("Error while scanning partial file: %v", err)
+ log.Warnf("Error while scanning partial file: %v", err)
return
}
}
@@ -160,6 +170,6 @@
func deletePartialFile(completeOrigFilePath string) {
err := os.Remove(completeOrigFilePath)
if err != nil {
- log.Errorf("Cannot delete partial file :%s", completeOrigFilePath)
+ log.Errorf("Cannot delete partial file: %s", completeOrigFilePath)
}
}
\ No newline at end of file
diff --git a/glide.yaml b/glide.yaml
index 4f63803..054e3eb 100644
--- a/glide.yaml
+++ b/glide.yaml
@@ -5,4 +5,4 @@
version: master
testImport:
- package: github.com/onsi/ginkgo/ginkgo
-- package: github.com/onsi/gomega
+- package: github.com/onsi/gomega
\ No newline at end of file
diff --git a/init.go b/init.go
index f4dbc66..40949bd 100644
--- a/init.go
+++ b/init.go
@@ -9,29 +9,36 @@
)
const (
- configAnalyticsBasePath = "apidanalytics_base_path" // config
+ // Base path of analytics API that will be exposed
+ configAnalyticsBasePath = "apidanalytics_base_path"
analyticsBasePathDefault = "/analytics"
- configAnalyticsDataPath = "apidanalytics_data_path" // config
+ // Root directory for analytics local data buffering
+ configAnalyticsDataPath = "apidanalytics_data_path"
analyticsDataPathDefault = "/ax"
- analyticsCollectionInterval = "apidanalytics_collection_interval" // config in seconds
+ // Data collection and buffering interval in seconds
+ analyticsCollectionInterval = "apidanalytics_collection_interval"
analyticsCollectionIntervalDefault = "120"
- analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds
+ // Interval in seconds based on which staging directory will be checked for folders ready to be uploaded
+ analyticsUploadInterval = "apidanalytics_upload_interval"
analyticsUploadIntervalDefault = "5"
+ // Number of slots for internal channel buffering of analytics records before they are dumped to a file
analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
- analyticsBufferChannelSizeDefault = 100 // number of slots
+ analyticsBufferChannelSizeDefault = 100
- uapServerBase = "apidanalytics_uap_server_base" // config
+ // EdgeX endpoint base path to access Uap Collection Endpoint
+ uapServerBase = "apidanalytics_uap_server_base"
+ // If caching is used then data scope and developer info will be maintained in-memory
+ // cache to avoid DB calls for each analytics message
useCaching = "apidanalytics_use_caching"
useCachingDefault = true
)
// keep track of the services that this plugin will use
-// note: services would also be available directly via the package global "apid" (eg. `apid.Log()`)
var (
log apid.LogService
config apid.ConfigService
@@ -67,45 +74,44 @@
// initPlugin will be called by apid to initialize
func initPlugin(services apid.Services) (apid.PluginData, error) {
-
// set a logger that is annotated for this plugin
- log = services.Log().ForModule("apigeeAnalytics")
+ log = services.Log().ForModule("apidAnalytics")
log.Debug("start init for apidAnalytics plugin")
- // set configuration
- err := setConfig(services)
- if err != nil {
- return pluginData, fmt.Errorf("Missing required config value: %s: ", err)
- }
-
- // localTesting
- //config.SetDefault(uapServerBase,"http://localhost:9010")
- //config.SetDefault("apigeesync_apid_instance_id","fesgG-3525-SFAG")
-
- for _, key := range []string{uapServerBase} {
- if !config.IsSet(key) {
- return pluginData, fmt.Errorf("Missing required config value: %s", key)
- }
-
- }
-
- directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
- err = createDirectories(directories)
-
- if err != nil {
- return pluginData, fmt.Errorf("Cannot create required local directories %s: ", err)
- }
-
data = services.Data()
events = services.Events()
events.Listen("ApigeeSync", &handler{})
+ // set configuration
+ err := setConfig(services)
+ if err != nil {
+ return pluginData, err
+ }
+
+ for _, key := range []string{uapServerBase} {
+ if !config.IsSet(key) {
+ return pluginData, fmt.Errorf("Missing required config value: %s", key)
+ }
+ }
+
+ // Create directories for managing buffering and upload to UAP stages
+ directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
+ err = createDirectories(directories)
+
+ if err != nil {
+ return pluginData, fmt.Errorf("Cannot create required local directories: %v ", err)
+ }
+
+ // Initialize one time crash recovery to be performed by the plugin on start up
initCrashRecovery()
+ // Initialize upload manager to watch the staging directory and upload files to UAP as they are ready
initUploadManager()
+ // Initialize buffer manager to watch the internalBuffer channel for new messages and dump them to files
initBufferingManager()
+ // Initialize API's and expose them
initAPI(services)
log.Debug("end init for apidAnalytics plugin")
return pluginData, nil
@@ -132,7 +138,7 @@
// set default config for collection interval
config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault)
- // set default config for local caching
+ // set default config for useCaching
config.SetDefault(useCaching, useCachingDefault)
// set default config for upload interval
@@ -152,7 +158,7 @@
if error != nil {
return error
}
- log.Infof("created directory for analytics data collection %s: ", path)
+ log.Infof("created directory for analytics data collection: %s", path)
}
}
return nil
diff --git a/listener.go b/listener.go
index 4dc37df..362dd69 100644
--- a/listener.go
+++ b/listener.go
@@ -1,18 +1,17 @@
package apidAnalytics
+
import (
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
)
-type handler struct {
-}
+type handler struct {}
func (h *handler) String() string {
- return "apidAnalytics"
+ return "apigeeAnalytics"
}
func (h *handler) Handle(e apid.Event) {
-
snapData, ok := e.(*common.Snapshot)
if ok {
processSnapshot(snapData)
@@ -47,17 +46,15 @@
if err != nil {
log.Error(err)
} else {
- log.Debug("Created a local cache for developer and app information")
+ log.Debug("Created a local cache for developer information")
}
} else {
- log.Debug("Will not be caching any info and make a DB call for every analytics msg")
+ log.Info("Will not be caching any developer info and make a DB call for every analytics msg")
}
-
return
}
func processChange(changes *common.ChangeList) {
-
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
var rows []common.Row
@@ -78,22 +75,24 @@
ele.Get("org", &org)
ele.Get("env", &env)
tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid}
- log.Debugf("refreshed local tenantCache. Added tenant: %s", tenantid)
+ log.Debugf("Refreshed local tenantCache. Added scope: %s", scopeuuid)
}
case common.Delete:
rows = append(rows, payload.NewRow)
+ // Lock before writing to the map as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
for _, ele := range rows {
var scopeuuid string
ele.Get("id", &scopeuuid)
delete(tenantCache, scopeuuid)
+ log.Debugf("Refreshed local tenantCache. Deleted scope: %s", scopeuuid)
}
}
case "kms.developer", "kms.app", "kms.api_product", "kms.app_credential_apiproduct_mapper":
// any change in any of the above tables should result in cache refresh
createDeveloperInfoCache()
- log.Debug("refresh local developerInfoCache")
+ log.Debug("Refresh local developerInfoCache")
}
}
}
diff --git a/upload_manager.go b/upload_manager.go
index 5e1f940..1cc3bc2 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -1,13 +1,18 @@
package apidAnalytics
import (
- _ "fmt"
"io/ioutil"
"os"
"path/filepath"
"time"
)
+const (
+ maxRetries = 3
+ retryFailedDirBatchSize = 10
+)
+
+// Each file upload is retried maxRetries times before moving it to failed directory
var retriesMap map[string]int
//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running
@@ -16,6 +21,7 @@
retriesMap = make(map[string]int)
go func() {
+ // Periodically check the staging directory to check if any folders are ready to be uploaded to S3
ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval))
log.Debugf("Intialized upload manager to check for staging directory")
defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running
@@ -24,7 +30,7 @@
files, err := ioutil.ReadDir(localAnalyticsStagingDir)
if err != nil {
- log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir)
+ log.Errorf("Cannot read directory: %s", localAnalyticsStagingDir)
}
uploadedDirCnt := 0
@@ -48,9 +54,10 @@
func handleUploadDirStatus(dir os.FileInfo, status bool) {
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
+ // If upload is successful then delete files and remove bucket from retry map
if status {
os.RemoveAll(completePath)
- log.Debugf("deleted directory after successful upload : %s", dir.Name())
+ log.Debugf("deleted directory after successful upload: %s", dir.Name())
// remove key if exists from retry map after a successful upload
delete(retriesMap, dir.Name())
} else {
@@ -60,7 +67,7 @@
failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
err := os.Rename(completePath, failedDirPath)
if err != nil {
- log.Errorf("Cannot move directory :%s to failed folder", dir.Name())
+ log.Errorf("Cannot move directory '%s' from staging to failed folder", dir.Name())
}
// remove key from retry map once it reaches allowed max failed attempts
delete(retriesMap, dir.Name())
@@ -72,7 +79,7 @@
failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir)
if err != nil {
- log.Errorf("Cannot read directory %s: ", localAnalyticsFailedDir)
+ log.Errorf("Cannot read directory: %s", localAnalyticsFailedDir)
}
cnt := 0
@@ -83,7 +90,7 @@
newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
err := os.Rename(failedPath, newStagingPath)
if err != nil {
- log.Errorf("Cannot move directory :%s to staging folder", dir.Name())
+ log.Errorf("Cannot move directory '%s' from failed to staging folder", dir.Name())
}
} else {
break
diff --git a/uploader.go b/uploader.go
index 0b84151..d8be63c 100644
--- a/uploader.go
+++ b/uploader.go
@@ -11,16 +11,12 @@
"time"
)
-const (
- maxRetries = 3
- retryFailedDirBatchSize = 10
- timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
-)
+const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
var token string
var client *http.Client = &http.Client{
- Timeout: time.Duration(60 * time.Second), // default timeout of 60 seconds while connecting to s3/GCS
+ Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS
}
func addHeaders(req *http.Request) {
@@ -29,8 +25,9 @@
}
func uploadDir(dir os.FileInfo) bool {
+ // Eg. org~env~20160101224500
tenant, timestamp := splitDirName(dir.Name())
- dateTimePartition := getDateFromDirTimestamp(timestamp)
+ dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
files, _ := ioutil.ReadDir(completePath)
@@ -42,18 +39,18 @@
relativeFilePath := dateTimePartition + "/" + file.Name();
status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
if error != nil {
- log.Errorf("Upload failed due to : %s", error.Error())
+ log.Errorf("Upload failed due to: %v", error)
break
} else {
os.Remove(completeFilePath)
- log.Debugf("Deleted file after successful upload : %s", file.Name())
+ log.Debugf("Deleted file '%s' after successful upload", file.Name())
}
}
return status
}
func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
- signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath)
+ signedUrl, err := getSignedUrl(tenant, relativeFilePath)
if (err != nil) {
return false, err
} else {
@@ -61,7 +58,7 @@
}
}
-func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) {
+func getSignedUrl(tenant, relativeFilePath string) (string, error) {
uapCollectionUrl := config.GetString(uapServerBase) + "/analytics"
req, err := http.NewRequest("GET", uapCollectionUrl, nil)
@@ -71,15 +68,14 @@
q := req.URL.Query()
- // localTesting
- q.Add("repo", "edge")
- q.Add("dataset", "api")
-
+ // eg. edgexfeb1~test
q.Add("tenant", tenant)
+ // eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz
q.Add("relative_file_path", relativeFilePath)
q.Add("file_content_type", "application/x-gzip")
req.URL.RawQuery = q.Encode()
+ // Add Bearer Token to each request
addHeaders(req)
resp, err := client.Do(req)
if err != nil {
@@ -94,12 +90,12 @@
signedURL := body["url"]
return signedURL.(string), nil
} else {
- return "", fmt.Errorf("Error while getting signed URL: %s",resp.Status)
+ return "", fmt.Errorf("Error while getting signed URL '%v'",resp.Status)
}
}
func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) {
- // read gzip file that needs to be uploaded
+ // open gzip file that needs to be uploaded
file, err := os.Open(completeFilePath)
if err != nil {
return false, err
@@ -108,7 +104,7 @@
req, err := http.NewRequest("PUT", signedUrl, file)
if err != nil {
- return false, fmt.Errorf("Parsing URL failed due to %v", err)
+ return false, fmt.Errorf("Parsing URL failed '%v'", err)
}
req.Header.Set("Expect", "100-continue")
@@ -116,7 +112,7 @@
fileStats, err := file.Stat()
if err != nil {
- return false, fmt.Errorf("Could not get content length for file: %v", err)
+ return false, fmt.Errorf("Could not get content length for file '%v'", err)
}
req.ContentLength = fileStats.Size()
@@ -129,10 +125,11 @@
if(resp.StatusCode == 200) {
return true, nil
} else {
- return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %v ", resp.Status)
+ return false,fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status)
}
}
+// Extract tenant and timestamp from directory Name
func splitDirName(dirName string) (string, string){
s := strings.Split(dirName, "~")
tenant := s[0]+"~"+s[1]
@@ -140,7 +137,7 @@
return tenant, timestamp
}
-// files are uploaded to S3 under specific partition and that key needs to be generated from the plugin
+// files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin
// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
func getDateFromDirTimestamp(timestamp string) (string){
dateTime, _ := time.Parse(timestampLayout, timestamp)