truncating the code to 80 columns
diff --git a/api.go b/api.go
index 25cbbf2..03f5e31 100644
--- a/api.go
+++ b/api.go
@@ -21,7 +21,8 @@
func initAPI(services apid.Services) {
log.Debug("initialized API's exposed by apidAnalytics plugin")
analyticsBasePath = config.GetString(configAnalyticsBasePath)
- services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST")
+ services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}",
+ saveAnalyticsRecord).Methods("POST")
}
func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
@@ -30,12 +31,15 @@
db := getDB() // When database isnt initialized
if db == nil {
- writeError(w, http.StatusInternalServerError, "INTERNAL_SERVER_ERROR", "Service is not initialized completely")
+ writeError(w, http.StatusInternalServerError,
+ "INTERNAL_SERVER_ERROR",
+ "Service is not initialized completely")
return
}
if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") {
- writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE", "Only supported content type is application/json")
+ writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE",
+ "Only supported content type is application/json")
return
}
@@ -45,16 +49,19 @@
if dbErr.ErrorCode != "" {
switch dbErr.ErrorCode {
case "INTERNAL_SEARCH_ERROR":
- writeError(w, http.StatusInternalServerError, "INTERNAL_SEARCH_ERROR", dbErr.Reason)
+ writeError(w, http.StatusInternalServerError,
+ "INTERNAL_SEARCH_ERROR", dbErr.Reason)
case "UNKNOWN_SCOPE":
- writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason)
+ writeError(w, http.StatusBadRequest,
+ "UNKNOWN_SCOPE", dbErr.Reason)
}
} else {
err := processPayload(tenant, scopeuuid, r)
if err.ErrorCode == "" {
w.WriteHeader(http.StatusOK)
} else {
- writeError(w, http.StatusBadRequest, err.ErrorCode, err.Reason)
+ writeError(w, http.StatusBadRequest,
+ err.ErrorCode, err.Reason)
}
}
}
diff --git a/api_helper.go b/api_helper.go
index 34ca14e..24dba22 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -9,7 +9,8 @@
)
/*
-Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel
+Implements all the helper methods needed to process the POST /analytics payload
+and send it to the internal buffer channel
*/
type developerInfo struct {
@@ -20,8 +21,9 @@
}
type axRecords struct {
- Tenant tenant
- Records []interface{} // Records is an array of multiple analytics records
+ Tenant tenant
+ // Records is an array of multiple analytics records
+ Records []interface{}
}
type tenant struct {
@@ -34,7 +36,9 @@
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
- return errResponse{ErrorCode: "UNSUPPORTED_CONTENT_ENCODING", Reason: "Only supported content encoding is gzip"}
+ return errResponse{
+ ErrorCode: "UNSUPPORTED_CONTENT_ENCODING",
+ Reason: "Only supported content encoding is gzip"}
} else {
gzipEncoded = true
}
@@ -45,7 +49,9 @@
if gzipEncoded {
reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
- return errResponse{ErrorCode: "BAD_DATA", Reason: "Gzip Encoded data cannot be read"}
+ return errResponse{
+ ErrorCode: "BAD_DATA",
+ Reason: "Gzip Encoded data cannot be read"}
}
} else {
reader = r.Body
@@ -64,7 +70,8 @@
decoder.UseNumber()
if err := decoder.Decode(&raw); err != nil {
- return errResponse{ErrorCode: "BAD_DATA", Reason: "Not a valid JSON payload"}
+ return errResponse{ErrorCode: "BAD_DATA",
+ Reason: "Not a valid JSON payload"}
}
if records := raw["records"]; records != nil {
@@ -75,14 +82,19 @@
if valid {
enrich(recordMap, scopeuuid, tenant)
} else {
- return err // Even if there is one bad record, then reject entire batch
+ // Even if there is one bad record, then reject entire batch
+ return err
}
}
- axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})}
+ axRecords := axRecords{
+ Tenant: tenant,
+ Records: records.([]interface{})}
// publish batch of records to channel (blocking call)
internalBuffer <- axRecords
} else {
- return errResponse{ErrorCode: "NO_RECORDS", Reason: "No analytics records in the payload"}
+ return errResponse{
+ ErrorCode: "NO_RECORDS",
+ Reason: "No analytics records in the payload"}
}
return errResponse{}
}
@@ -96,7 +108,9 @@
elems := []string{"client_received_start_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
- return false, errResponse{ErrorCode: "MISSING_FIELD", Reason: "Missing Required field: " + elem}
+ return false, errResponse{
+ ErrorCode: "MISSING_FIELD",
+ Reason: "Missing Required field: " + elem}
}
}
@@ -104,7 +118,10 @@
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
if crst.(json.Number) > cret.(json.Number) {
- return false, errResponse{ErrorCode: "BAD_DATA", Reason: "client_received_start_timestamp > client_received_end_timestamp"}
+ return false, errResponse{
+ ErrorCode: "BAD_DATA",
+ Reason: "client_received_start_timestamp " +
+ "> client_received_end_timestamp"}
}
}
return true, errResponse{}
diff --git a/api_test.go b/api_test.go
index f56a463..d172a29 100644
--- a/api_test.go
+++ b/api_test.go
@@ -20,7 +20,8 @@
v.Add("bundle_scope_uuid", "testid")
client := &http.Client{}
- req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode()))
+ req, err := http.NewRequest("POST", uri.String(),
+ strings.NewReader(v.Encode()))
res, err := client.Do(req)
defer res.Body.Close()
Expect(err).ShouldNot(HaveOccurred())
@@ -37,7 +38,8 @@
v.Add("bundle_scope_uuid", "wrongId")
client := &http.Client{}
- req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode()))
+ req, err := http.NewRequest("POST", uri.String(),
+ strings.NewReader(v.Encode()))
res, err := client.Do(req)
defer res.Body.Close()
Expect(err).ShouldNot(HaveOccurred())
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index 247212f..1e5c128 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -33,8 +33,9 @@
Expect(err).NotTo(HaveOccurred())
config.Set("data_path", testTempDir)
- config.Set(uapServerBase, "http://localhost:9000") // dummy value
- config.Set("apigeesync_apid_instance_id", "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
+ config.Set(uapServerBase, "http://localhost:9000") // dummy value
+ config.Set("apigeesync_apid_instance_id",
+ "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
config.Set(useCaching, true)
db, err := apid.Data().DB()
@@ -49,11 +50,12 @@
createTenantCache()
createDeveloperInfoCache()
- testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
- if req.URL.Path == analyticsBasePathDefault {
- saveAnalyticsRecord(w, req)
- }
- }))
+ testServer = httptest.NewServer(http.HandlerFunc(
+ func(w http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == analyticsBasePathDefault {
+ saveAnalyticsRecord(w, req)
+ }
+ }))
})
func createTables(db apid.DB) {
@@ -169,7 +171,8 @@
txn, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
- txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+
+ txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id,"+
+ " appcred_id, app_id, apiprdt_id, status, _change_selector) "+
"VALUES"+
"($1,$2,$3,$4,$5,$6)",
"tenantid",
@@ -206,7 +209,8 @@
"testdeveloper@test.com",
)
- txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+
+ txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, "+
+ "apid_cluster_id, scope, org, env) "+
"VALUES"+
"($1,$2,$3,$4,$5,$6)",
"testid",
diff --git a/buffering_manager.go b/buffering_manager.go
index 6ec1ec1..7e28d4b 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -13,10 +13,12 @@
const fileExtension = ".txt.gz"
-// Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread
+// Channel where analytics records are buffered before being dumped to a
+// file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
-// Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval
+// Channel where close bucket event is published i.e. when a bucket
+// is ready to be closed based on collection interval
var closeBucketEvent chan bucket
// Map from timestampt to bucket
@@ -36,7 +38,8 @@
}
func initBufferingManager() {
- internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize))
+ internalBuffer = make(chan axRecords,
+ config.GetInt(analyticsBufferChannelSize))
closeBucketEvent = make(chan bucket)
bucketMap = make(map[int64]bucket)
@@ -46,7 +49,8 @@
records := <-internalBuffer
err := save(records)
if err != nil {
- log.Errorf("Could not save %d messages to file due to: %v", len(records.Records), err)
+ log.Errorf("Could not save %d messages to file"+
+ " due to: %v", len(records.Records), err)
}
}
}()
@@ -55,17 +59,20 @@
go func() {
for {
bucket := <-closeBucketEvent
- log.Debugf("Close Event received for bucket: %s", bucket.DirName)
+ log.Debugf("Close Event received for bucket: %s",
+ bucket.DirName)
// close open file
closeGzipFile(bucket.FileWriter)
dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName)
stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName)
- // close files in tmp folder and move directory to staging to indicate its ready for upload
+ // close files in tmp folder and move directory to
+ // staging to indicate its ready for upload
err := os.Rename(dirToBeClosed, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from tmp to staging folder", bucket.DirName)
+ log.Errorf("Cannot move directory '%s' from"+
+ " tmp to staging folder", bucket.DirName)
}
}
}()
@@ -82,7 +89,8 @@
}
func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
- // first based on current timestamp and collection interval, determine the timestamp of the bucket
+ // first based on current timestamp and collection interval,
+ // determine the timestamp of the bucket
ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
_, exists := bucketMap[ts]
if exists {
@@ -99,12 +107,16 @@
// create dir
err := os.Mkdir(newPath, os.ModePerm)
if err != nil {
- return bucket{}, fmt.Errorf("Cannot create directory '%s' to buffer messages '%v'", dirName, err)
+ return bucket{}, fmt.Errorf("Cannot create directory "+
+ "'%s' to buffer messages '%v'", dirName, err)
}
// create file for writing
// Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz
- fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0" + fileExtension
+ fileName := getRandomHex() + "_" + timestamp + "." +
+ endtimestamp + "_" +
+ config.GetString("apigeesync_apid_instance_id") +
+ "_writer_0" + fileExtension
completeFilePath := filepath.Join(newPath, fileName)
fw, err := createGzipFile(completeFilePath)
if err != nil {
@@ -114,7 +126,8 @@
newBucket := bucket{DirName: dirName, FileWriter: fw}
bucketMap[ts] = newBucket
- //Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file
+ //Send event to close directory after endTime + 5
+ // seconds to make sure all buffers are flushed to file
timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
go func() {
<-timer
@@ -124,7 +137,8 @@
}
}
-// 4 digit Hex is prefixed to each filename to improve how s3 partitions the files being uploaded
+// 4 digit Hex is prefixed to each filename to improve
+// how s3 partitions the files being uploaded
func getRandomHex() string {
buff := make([]byte, 2)
rand.Read(buff)
@@ -134,7 +148,9 @@
func createGzipFile(s string) (fileWriter, error) {
file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
- return fileWriter{}, fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err)
+ return fileWriter{},
+ fmt.Errorf("Cannot create file '%s' "+
+ "to buffer messages '%v'", s, err)
}
gw := gzip.NewWriter(file)
bw := bufio.NewWriter(gw)
diff --git a/common_helper.go b/common_helper.go
index 31f7d82..9acc033 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -9,16 +9,19 @@
// Cache for scope uuid to org, env and tenantId information
var tenantCache map[string]tenant
-// RW lock for tenant map cache since the cache can be read while its being written to and vice versa
+// RW lock for tenant map cache since the cache can be
+// read while its being written to and vice versa
var tenantCachelock = sync.RWMutex{}
// Cache for apiKey~tenantId to developer related information
var developerInfoCache map[string]developerInfo
-// RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa
+// RW lock for developerInfo map cache since the cache can be
+// read while its being written to and vice versa
var developerInfoCacheLock = sync.RWMutex{}
-// Load data scope information into an in-memory cache so that for each record a DB lookup is not required
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
func createTenantCache() error {
// Lock before writing to the map as it has multiple readers
tenantCachelock.Lock()
@@ -30,12 +33,15 @@
rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE")
if error != nil {
- return fmt.Errorf("Count not get datascope from DB due to: %v", error)
+ return fmt.Errorf("Count not get datascope from "+
+ "DB due to: %v", error)
} else {
defer rows.Close()
for rows.Next() {
rows.Scan(&env, &org, &tenantId, &id)
- tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId}
+ tenantCache[id] = tenant{Org: org,
+ Env: env,
+ TenantId: tenantId}
}
}
@@ -43,7 +49,8 @@
return nil
}
-// Load data scope information into an in-memory cache so that for each record a DB lookup is not required
+// Load data scope information into an in-memory cache so that
+// for each record a DB lookup is not required
func createDeveloperInfoCache() error {
// Lock before writing to the map as it has multiple readers
developerInfoCacheLock.Lock()
@@ -53,7 +60,8 @@
var tenantId, apiKey string
db := getDB()
- sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name, a.name, d.username, d.email " +
+ sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name," +
+ " a.name, d.username, d.email " +
"FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " +
"INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " +
"INNER JOIN APP AS a ON a.id = mp.app_id " +
@@ -61,11 +69,13 @@
rows, error := db.Query(sSql)
if error != nil {
- return fmt.Errorf("Count not get developerInfo from DB due to: %v", error)
+ return fmt.Errorf("Count not get developerInfo "+
+ "from DB due to: %v", error)
} else {
defer rows.Close()
for rows.Next() {
- rows.Scan(&tenantId, &apiKey, &apiProduct, &developerApp, &developer, &developerEmail)
+ rows.Scan(&tenantId, &apiKey, &apiProduct,
+ &developerApp, &developer, &developerEmail)
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
apiPrd := getValuesIgnoringNull(apiProduct)
@@ -73,15 +83,21 @@
dev := getValuesIgnoringNull(developer)
devEmail := getValuesIgnoringNull(developerEmail)
- developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
+ developerInfoCache[keyForMap] = developerInfo{
+ ApiProduct: apiPrd,
+ DeveloperApp: devApp,
+ DeveloperEmail: devEmail,
+ Developer: dev}
}
}
- log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache))
+ log.Debugf("Count of apiKey~tenantId combinations "+
+ "in the cache: %d", len(developerInfoCache))
return nil
}
-// Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config
+// Returns Tenant Info given a scope uuid from the cache or by querying
+// the DB directly based on useCachig config
func getTenantForScope(scopeuuid string) (tenant, dbError) {
if config.GetBool(useCaching) {
// acquire a read lock as this cache has 1 writer as well
@@ -91,9 +107,12 @@
if !exists {
reason := "No tenant found for this scopeuuid: " + scopeuuid
errorCode := "UNKNOWN_SCOPE"
- // Incase of unknown scope, try to refresh the cache ansynchronously incase an update was missed or delayed
+ // Incase of unknown scope, try to refresh the
+ // cache ansynchronously incase an update was missed or delayed
go createTenantCache()
- return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
} else {
return ten, dbError{}
}
@@ -101,23 +120,32 @@
var org, env, tenantId string
db := getDB()
- error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
+ error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+
+ " where id = ?", scopeuuid).Scan(&env, &org, &tenantId)
switch {
case error == sql.ErrNoRows:
reason := "No tenant found for this scopeuuid: " + scopeuuid
errorCode := "UNKNOWN_SCOPE"
- return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
case error != nil:
reason := error.Error()
errorCode := "INTERNAL_SEARCH_ERROR"
- return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
+ return tenant{}, dbError{
+ ErrorCode: errorCode,
+ Reason: reason}
}
- return tenant{Org: org, Env: env, TenantId: tenantId}, dbError{}
+ return tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantId}, dbError{}
}
}
-// Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config
+// Returns Dveloper related info given an apiKey and tenantId
+// from the cache or by querying the DB directly based on useCachig config
func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
if config.GetBool(useCaching) {
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
@@ -126,15 +154,18 @@
defer tenantCachelock.RUnlock()
devInfo, exists := developerInfoCache[keyForMap]
if !exists {
- log.Warnf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
- // Incase of unknown apiKey~tenantId, try to refresh the cache ansynchronously incase an update was missed or delayed
+ log.Warnf("No data found for for tenantId = %s"+
+ " and apiKey = %s", tenantId, apiKey)
+ // Incase of unknown apiKey~tenantId,
+ // try to refresh the cache ansynchronously incase an update was missed or delayed
go createTenantCache()
return developerInfo{}
} else {
return devInfo
}
} else {
- var apiProduct, developerApp, developerEmail, developer sql.NullString
+ var apiProduct, developerApp, developerEmail sql.NullString
+ var developer sql.NullString
db := getDB()
sSql := "SELECT ap.name, a.name, d.username, d.email " +
@@ -143,14 +174,18 @@
"INNER JOIN APP AS a ON a.id = mp.app_id " +
"INNER JOIN DEVELOPER as d ON d.id = a.developer_id " +
"where mp.tenant_id = ? and mp.appcred_id = ?;"
- error := db.QueryRow(sSql, tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail)
+ error := db.QueryRow(sSql, tenantId, apiKey).
+ Scan(&apiProduct, &developerApp,
+ &developer, &developerEmail)
switch {
case error == sql.ErrNoRows:
- log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey)
+ log.Debugf("No data found for for tenantId = %s "+
+ "and apiKey = %s", tenantId, apiKey)
return developerInfo{}
case error != nil:
- log.Debugf("No data found for for tenantId = %s and apiKey = %s due to: %v", tenantId, apiKey, error)
+ log.Debugf("No data found for for tenantId = %s and "+
+ "apiKey = %s due to: %v", tenantId, apiKey, error)
return developerInfo{}
}
@@ -158,7 +193,10 @@
devApp := getValuesIgnoringNull(developerApp)
dev := getValuesIgnoringNull(developer)
devEmail := getValuesIgnoringNull(developerEmail)
- return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev}
+ return developerInfo{ApiProduct: apiPrd,
+ DeveloperApp: devApp,
+ DeveloperEmail: devEmail,
+ Developer: dev}
}
}
diff --git a/crash_recovery.go b/crash_recovery.go
index b34dc9d..ccd3b36 100644
--- a/crash_recovery.go
+++ b/crash_recovery.go
@@ -11,15 +11,18 @@
)
const (
- crashRecoveryDelay = 30 // seconds
- recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
- recoveredTS = "~recoveredTS~" // Constant to identify recovered files
+ crashRecoveryDelay = 30 // seconds
+ // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
+ recoveryTSLayout = "20060102150405.000"
+ // Constant to identify recovered files
+ recoveredTS = "~recoveredTS~"
)
func initCrashRecovery() {
if crashRecoveryNeeded() {
timer := time.After(time.Second * crashRecoveryDelay)
- // Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up
+ // Actual recovery of files is attempted asynchronously
+ // after a delay to not block the apid plugin from starting up
go func() {
<-timer
performRecovery()
@@ -33,12 +36,14 @@
tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
if needed {
- log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay)
+ log.Infof("Crash Recovery is needed and will be "+
+ "attempted in %d seconds", crashRecoveryDelay)
}
return needed
}
-// If Apid is shutdown or crashes while a file is still open in tmp folder, then the file has partial data.
+// If Apid is shutdown or crashes while a file is still open in tmp folder,
+// then the file has partial data.
// This partial data can be recoverd.
func recoverFoldersInTmpDir() bool {
tmpRecoveryNeeded := false
@@ -46,13 +51,16 @@
recoveryTS := getRecoveryTS()
for _, dir := range dirs {
tmpRecoveryNeeded = true
- log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name())
+ log.Debugf("Moving directory '%s' from tmp to"+
+ " recovered folder", dir.Name())
tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
- newDirName := dir.Name() + recoveredTS + recoveryTS // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ newDirName := dir.Name() + recoveredTS + recoveryTS
recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName)
err := os.Rename(tmpCompletePath, recoveredCompletePath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name())
+ log.Errorf("Cannot move directory '%s' "+
+ "from tmp to recovered folder", dir.Name())
}
}
return tmpRecoveryNeeded
@@ -64,8 +72,10 @@
return current.Format(recoveryTSLayout)
}
-// If APID is restarted twice immediately such that files have been moved to recovered folder but actual recovery has'nt started or is partially done
-// Then the files will just stay in the recovered dir and need to be recovered again.
+// If APID is restarted twice immediately such that files have been moved to
+// recovered folder but actual recovery has'nt started or is partially done
+// Then the files will just stay in the recovered dir
+// and need to be recovered again.
func recoverFolderInRecoveredDir() bool {
dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
if len(dirs) > 0 {
@@ -88,7 +98,8 @@
var bucketRecoveryTS string
// Parse bucket name to extract recoveryTS and pass it each file to be recovered
- // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 -> bucketRecoveryTS = _20160101222612.123
+ // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ // -> bucketRecoveryTS = _20160101222612.123
index := strings.Index(dirName, recoveredTS)
if index != -1 {
bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):]
@@ -104,7 +115,8 @@
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
err := os.Rename(dirBeingRecovered, stagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from recovered to staging folder", dirName)
+ log.Errorf("Cannot move directory '%s' from"+
+ " recovered to staging folder", dirName)
}
}
@@ -123,7 +135,8 @@
deletePartialFile(completeOrigFilePath)
}
-// The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file.
+// The file is read line by line and all complete records are extracted
+// and copied to a new file which is closed as a correct gzip file.
func copyPartialFile(completeOrigFilePath, recoveredFilePath string) {
partialFile, err := os.Open(completeOrigFilePath)
if err != nil {
@@ -135,7 +148,8 @@
bufReader := bufio.NewReader(partialFile)
gzReader, err := gzip.NewReader(bufReader)
if err != nil {
- log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err)
+ log.Errorf("Cannot create reader on gzip file: %s"+
+ " due to %v", completeOrigFilePath, err)
return
}
defer gzReader.Close()
@@ -143,7 +157,8 @@
scanner := bufio.NewScanner(gzReader)
// Create new file to copy complete records from partial file and upload only a complete file
- recoveredFile, err := os.OpenFile(recoveredFilePath, os.O_WRONLY|os.O_CREATE, os.ModePerm)
+ recoveredFile, err := os.OpenFile(recoveredFilePath,
+ os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
log.Errorf("Cannot create recovered file: %s", recoveredFilePath)
return
diff --git a/init.go b/init.go
index e383f16..a3501a4 100644
--- a/init.go
+++ b/init.go
@@ -21,18 +21,21 @@
analyticsCollectionInterval = "apidanalytics_collection_interval"
analyticsCollectionIntervalDefault = "120"
- // Interval in seconds based on which staging directory will be checked for folders ready to be uploaded
+ // Interval in seconds based on which staging directory
+ // will be checked for folders ready to be uploaded
analyticsUploadInterval = "apidanalytics_upload_interval"
analyticsUploadIntervalDefault = "5"
- // Number of slots for internal channel buffering of analytics records before they are dumped to a file
+ // Number of slots for internal channel buffering of
+ // analytics records before they are dumped to a file
analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
analyticsBufferChannelSizeDefault = 100
// EdgeX endpoint base path to access Uap Collection Endpoint
uapServerBase = "apidanalytics_uap_server_base"
- // If caching is used then data scope and developer info will be maintained in-memory
+ // If caching is used then data scope and developer
+ // info will be maintained in-memory
// cache to avoid DB calls for each analytics message
useCaching = "apidanalytics_use_caching"
useCachingDefault = true
@@ -90,25 +93,33 @@
for _, key := range []string{uapServerBase} {
if !config.IsSet(key) {
- return pluginData, fmt.Errorf("Missing required config value: %s", key)
+ return pluginData,
+ fmt.Errorf("Missing required config value: %s", key)
}
}
// Create directories for managing buffering and upload to UAP stages
- directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir}
+ directories := []string{localAnalyticsBaseDir,
+ localAnalyticsTempDir,
+ localAnalyticsStagingDir,
+ localAnalyticsFailedDir,
+ localAnalyticsRecoveredDir}
err = createDirectories(directories)
if err != nil {
- return pluginData, fmt.Errorf("Cannot create required local directories: %v ", err)
+ return pluginData, fmt.Errorf("Cannot create "+
+ "required local directories: %v ", err)
}
// Initialize one time crash recovery to be performed by the plugin on start up
initCrashRecovery()
- // Initialize upload manager to watch the staging directory and upload files to UAP as they are ready
+ // Initialize upload manager to watch the staging directory and
+ // upload files to UAP as they are ready
initUploadManager()
- // Initialize buffer manager to watch the internalBuffer channel for new messages and dump them to files
+ // Initialize buffer manager to watch the internalBuffer channel
+ // for new messages and dump them to files
initBufferingManager()
// Initialize API's and expose them
@@ -129,7 +140,8 @@
}
// set local directory paths that will be used to buffer analytics data on disk
- localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"), config.GetString(configAnalyticsDataPath))
+ localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"),
+ config.GetString(configAnalyticsDataPath))
localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp")
localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging")
localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed")
@@ -158,7 +170,8 @@
if error != nil {
return error
}
- log.Infof("created directory for analytics data collection: %s", path)
+ log.Infof("created directory for analytics "+
+ "data collection: %s", path)
}
}
return nil
diff --git a/listener.go b/listener.go
index f86b2ad..e320fed 100644
--- a/listener.go
+++ b/listener.go
@@ -27,7 +27,8 @@
}
func processSnapshot(snapshot *common.Snapshot) {
- log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
+ log.Debugf("Snapshot received. Switching to"+
+ " DB version: %s", snapshot.SnapshotInfo)
db, err := data.DBVersion(snapshot.SnapshotInfo)
if err != nil {
@@ -40,7 +41,8 @@
if err != nil {
log.Error(err)
} else {
- log.Debug("Created a local cache for datasope information")
+ log.Debug("Created a local cache" +
+ " for datasope information")
}
err = createDeveloperInfoCache()
if err != nil {
@@ -49,7 +51,8 @@
log.Debug("Created a local cache for developer information")
}
} else {
- log.Info("Will not be caching any developer info and make a DB call for every analytics msg")
+ log.Info("Will not be caching any developer info " +
+ "and make a DB call for every analytics msg")
}
return
}
@@ -66,32 +69,44 @@
switch payload.Operation {
case common.Insert, common.Update:
rows = append(rows, payload.NewRow)
- // Lock before writing to the map as it has multiple readers
+ // Lock before writing to the
+ // map as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
for _, ele := range rows {
- var scopeuuid, tenantid, org, env string
+ var scopeuuid, tenantid string
+ var org, env string
ele.Get("id", &scopeuuid)
ele.Get("scope", &tenantid)
ele.Get("org", &org)
ele.Get("env", &env)
- tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid}
- log.Debugf("Refreshed local tenantCache. Added scope: %s", scopeuuid)
+ tenantCache[scopeuuid] = tenant{
+ Org: org,
+ Env: env,
+ TenantId: tenantid}
+ log.Debugf("Refreshed local "+
+ "tenantCache. Added "+
+ "scope: "+"%s", scopeuuid)
}
case common.Delete:
rows = append(rows, payload.OldRow)
- // Lock before writing to the map as it has multiple readers
+ // Lock before writing to the map
+ // as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
for _, ele := range rows {
var scopeuuid string
ele.Get("id", &scopeuuid)
delete(tenantCache, scopeuuid)
- log.Debugf("Refreshed local tenantCache. Deleted scope: %s", scopeuuid)
+ log.Debugf("Refreshed local"+
+ " tenantCache. Deleted"+
+ " scope: %s", scopeuuid)
}
}
- case "kms.developer", "kms.app", "kms.api_product", "kms.app_credential_apiproduct_mapper":
- // any change in any of the above tables should result in cache refresh
+ case "kms.developer", "kms.app", "kms.api_product",
+ "kms.app_credential_apiproduct_mapper":
+ // any change in any of the above tables
+ // should result in cache refresh
createDeveloperInfoCache()
log.Debug("Refresh local developerInfoCache")
}
diff --git a/upload_manager.go b/upload_manager.go
index 1ae94a5..a95ce87 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -12,25 +12,32 @@
retryFailedDirBatchSize = 10
)
-// Each file upload is retried maxRetries times before moving it to failed directory
+// Each file upload is retried maxRetries times before
+// moving it to failed directory
var retriesMap map[string]int
-//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running
+//TODO: make sure that this instance gets initialized only once
+// since we dont want multiple upload manager tickers running
func initUploadManager() {
retriesMap = make(map[string]int)
go func() {
- // Periodically check the staging directory to check if any folders are ready to be uploaded to S3
- ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval))
+ // Periodically check the staging directory to check
+ // if any folders are ready to be uploaded to S3
+ ticker := time.NewTicker(time.Second *
+ config.GetDuration(analyticsUploadInterval))
log.Debugf("Intialized upload manager to check for staging directory")
- defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running
+ // Ticker will keep running till go routine is running
+ // i.e. till application is running
+ defer ticker.Stop()
for range ticker.C {
files, err := ioutil.ReadDir(localAnalyticsStagingDir)
if err != nil {
- log.Errorf("Cannot read directory: %s", localAnalyticsStagingDir)
+ log.Errorf("Cannot read directory: "+
+ "%s", localAnalyticsStagingDir)
}
uploadedDirCnt := 0
@@ -44,7 +51,8 @@
}
}
if uploadedDirCnt > 0 {
- // After a successful upload, retry the folders in failed directory as they might have
+ // After a successful upload, retry the
+ // folders in failed directory as they might have
// failed due to intermitent S3/GCS issue
retryFailedUploads()
}
@@ -54,10 +62,12 @@
func handleUploadDirStatus(dir os.FileInfo, status bool) {
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
- // If upload is successful then delete files and remove bucket from retry map
+ // If upload is successful then delete files
+ // and remove bucket from retry map
if status {
os.RemoveAll(completePath)
- log.Debugf("deleted directory after successful upload: %s", dir.Name())
+ log.Debugf("deleted directory after "+
+ "successful upload: %s", dir.Name())
// remove key if exists from retry map after a successful upload
delete(retriesMap, dir.Name())
} else {
@@ -67,7 +77,8 @@
failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name())
err := os.Rename(completePath, failedDirPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from staging to failed folder", dir.Name())
+ log.Errorf("Cannot move directory '%s'"+
+ " from staging to failed folder", dir.Name())
}
// remove key from retry map once it reaches allowed max failed attempts
delete(retriesMap, dir.Name())
@@ -90,7 +101,8 @@
newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name())
err := os.Rename(failedPath, newStagingPath)
if err != nil {
- log.Errorf("Cannot move directory '%s' from failed to staging folder", dir.Name())
+ log.Errorf("Cannot move directory '%s'"+
+ " from failed to staging folder", dir.Name())
}
} else {
break
diff --git a/uploader.go b/uploader.go
index a8292a3..fb8a267 100644
--- a/uploader.go
+++ b/uploader.go
@@ -16,7 +16,8 @@
var token string
var client *http.Client = &http.Client{
- Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS
+ //set default timeout of 60 seconds while connecting to s3/GCS
+ Timeout: time.Duration(60 * time.Second),
}
func addHeaders(req *http.Request) {
@@ -27,7 +28,8 @@
func uploadDir(dir os.FileInfo) bool {
// Eg. org~env~20160101224500
tenant, timestamp := splitDirName(dir.Name())
- dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45
+ //date=2016-01-01/time=22-45
+ dateTimePartition := getDateFromDirTimestamp(timestamp)
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
files, _ := ioutil.ReadDir(completePath)
@@ -43,7 +45,8 @@
break
} else {
os.Remove(completeFilePath)
- log.Debugf("Deleted file '%s' after successful upload", file.Name())
+ log.Debugf("Deleted file '%s' after "+
+ "successful upload", file.Name())
}
}
return status
@@ -90,7 +93,8 @@
signedURL := body["url"]
return signedURL.(string), nil
} else {
- return "", fmt.Errorf("Error while getting signed URL '%v'", resp.Status)
+ return "", fmt.Errorf("Error while getting "+
+ "signed URL '%v'", resp.Status)
}
}
@@ -112,7 +116,8 @@
fileStats, err := file.Stat()
if err != nil {
- return false, fmt.Errorf("Could not get content length for file '%v'", err)
+ return false, fmt.Errorf("Could not get content length for "+
+ "file '%v'", err)
}
req.ContentLength = fileStats.Size()
@@ -125,7 +130,8 @@
if resp.StatusCode == 200 {
return true, nil
} else {
- return false, fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status)
+ return false, fmt.Errorf("Final Datastore (S3/GCS)returned "+
+ "Error '%v'", resp.Status)
}
}
@@ -137,7 +143,8 @@
return tenant, timestamp
}
-// files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin
+// files are uploaded to S3 under specific date time partition and that
+// key needs to be generated from the plugin
// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
func getDateFromDirTimestamp(timestamp string) string {
dateTime, _ := time.Parse(timestampLayout, timestamp)