truncating the code to 80 columns
diff --git a/api.go b/api.go index 25cbbf2..03f5e31 100644 --- a/api.go +++ b/api.go
@@ -21,7 +21,8 @@ func initAPI(services apid.Services) { log.Debug("initialized API's exposed by apidAnalytics plugin") analyticsBasePath = config.GetString(configAnalyticsBasePath) - services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST") + services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", + saveAnalyticsRecord).Methods("POST") } func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) { @@ -30,12 +31,15 @@ db := getDB() // When database isnt initialized if db == nil { - writeError(w, http.StatusInternalServerError, "INTERNAL_SERVER_ERROR", "Service is not initialized completely") + writeError(w, http.StatusInternalServerError, + "INTERNAL_SERVER_ERROR", + "Service is not initialized completely") return } if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") { - writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE", "Only supported content type is application/json") + writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE", + "Only supported content type is application/json") return } @@ -45,16 +49,19 @@ if dbErr.ErrorCode != "" { switch dbErr.ErrorCode { case "INTERNAL_SEARCH_ERROR": - writeError(w, http.StatusInternalServerError, "INTERNAL_SEARCH_ERROR", dbErr.Reason) + writeError(w, http.StatusInternalServerError, + "INTERNAL_SEARCH_ERROR", dbErr.Reason) case "UNKNOWN_SCOPE": - writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason) + writeError(w, http.StatusBadRequest, + "UNKNOWN_SCOPE", dbErr.Reason) } } else { err := processPayload(tenant, scopeuuid, r) if err.ErrorCode == "" { w.WriteHeader(http.StatusOK) } else { - writeError(w, http.StatusBadRequest, err.ErrorCode, err.Reason) + writeError(w, http.StatusBadRequest, + err.ErrorCode, err.Reason) } } }
diff --git a/api_helper.go b/api_helper.go index 34ca14e..24dba22 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -9,7 +9,8 @@ ) /* -Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel +Implements all the helper methods needed to process the POST /analytics payload +and send it to the internal buffer channel */ type developerInfo struct { @@ -20,8 +21,9 @@ } type axRecords struct { - Tenant tenant - Records []interface{} // Records is an array of multiple analytics records + Tenant tenant + // Records is an array of multiple analytics records + Records []interface{} } type tenant struct { @@ -34,7 +36,9 @@ var gzipEncoded bool if r.Header.Get("Content-Encoding") != "" { if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") { - return errResponse{ErrorCode: "UNSUPPORTED_CONTENT_ENCODING", Reason: "Only supported content encoding is gzip"} + return errResponse{ + ErrorCode: "UNSUPPORTED_CONTENT_ENCODING", + Reason: "Only supported content encoding is gzip"} } else { gzipEncoded = true } @@ -45,7 +49,9 @@ if gzipEncoded { reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data if err != nil { - return errResponse{ErrorCode: "BAD_DATA", Reason: "Gzip Encoded data cannot be read"} + return errResponse{ + ErrorCode: "BAD_DATA", + Reason: "Gzip Encoded data cannot be read"} } } else { reader = r.Body @@ -64,7 +70,8 @@ decoder.UseNumber() if err := decoder.Decode(&raw); err != nil { - return errResponse{ErrorCode: "BAD_DATA", Reason: "Not a valid JSON payload"} + return errResponse{ErrorCode: "BAD_DATA", + Reason: "Not a valid JSON payload"} } if records := raw["records"]; records != nil { @@ -75,14 +82,19 @@ if valid { enrich(recordMap, scopeuuid, tenant) } else { - return err // Even if there is one bad record, then reject entire batch + // Even if there is one bad record, then reject entire batch + return err } } - axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})} + axRecords := axRecords{ + Tenant: tenant, + Records: records.([]interface{})} // publish batch of records to channel (blocking call) internalBuffer <- axRecords } else { - return errResponse{ErrorCode: "NO_RECORDS", Reason: "No analytics records in the payload"} + return errResponse{ + ErrorCode: "NO_RECORDS", + Reason: "No analytics records in the payload"} } return errResponse{} } @@ -96,7 +108,9 @@ elems := []string{"client_received_start_timestamp"} for _, elem := range elems { if recordMap[elem] == nil { - return false, errResponse{ErrorCode: "MISSING_FIELD", Reason: "Missing Required field: " + elem} + return false, errResponse{ + ErrorCode: "MISSING_FIELD", + Reason: "Missing Required field: " + elem} } } @@ -104,7 +118,10 @@ cret, exists2 := recordMap["client_received_end_timestamp"] if exists1 && exists2 { if crst.(json.Number) > cret.(json.Number) { - return false, errResponse{ErrorCode: "BAD_DATA", Reason: "client_received_start_timestamp > client_received_end_timestamp"} + return false, errResponse{ + ErrorCode: "BAD_DATA", + Reason: "client_received_start_timestamp " + + "> client_received_end_timestamp"} } } return true, errResponse{}
diff --git a/api_test.go b/api_test.go index f56a463..d172a29 100644 --- a/api_test.go +++ b/api_test.go
@@ -20,7 +20,8 @@ v.Add("bundle_scope_uuid", "testid") client := &http.Client{} - req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode())) + req, err := http.NewRequest("POST", uri.String(), + strings.NewReader(v.Encode())) res, err := client.Do(req) defer res.Body.Close() Expect(err).ShouldNot(HaveOccurred()) @@ -37,7 +38,8 @@ v.Add("bundle_scope_uuid", "wrongId") client := &http.Client{} - req, err := http.NewRequest("POST", uri.String(), strings.NewReader(v.Encode())) + req, err := http.NewRequest("POST", uri.String(), + strings.NewReader(v.Encode())) res, err := client.Do(req) defer res.Body.Close() Expect(err).ShouldNot(HaveOccurred())
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go index 247212f..1e5c128 100644 --- a/apidAnalytics_suite_test.go +++ b/apidAnalytics_suite_test.go
@@ -33,8 +33,9 @@ Expect(err).NotTo(HaveOccurred()) config.Set("data_path", testTempDir) - config.Set(uapServerBase, "http://localhost:9000") // dummy value - config.Set("apigeesync_apid_instance_id", "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value + config.Set(uapServerBase, "http://localhost:9000") // dummy value + config.Set("apigeesync_apid_instance_id", + "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value config.Set(useCaching, true) db, err := apid.Data().DB() @@ -49,11 +50,12 @@ createTenantCache() createDeveloperInfoCache() - testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - if req.URL.Path == analyticsBasePathDefault { - saveAnalyticsRecord(w, req) - } - })) + testServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, req *http.Request) { + if req.URL.Path == analyticsBasePathDefault { + saveAnalyticsRecord(w, req) + } + })) }) func createTables(db apid.DB) { @@ -169,7 +171,8 @@ txn, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) - txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+ + txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id,"+ + " appcred_id, app_id, apiprdt_id, status, _change_selector) "+ "VALUES"+ "($1,$2,$3,$4,$5,$6)", "tenantid", @@ -206,7 +209,8 @@ "testdeveloper@test.com", ) - txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+ + txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, "+ + "apid_cluster_id, scope, org, env) "+ "VALUES"+ "($1,$2,$3,$4,$5,$6)", "testid",
diff --git a/buffering_manager.go b/buffering_manager.go index 6ec1ec1..7e28d4b 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -13,10 +13,12 @@ const fileExtension = ".txt.gz" -// Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread +// Channel where analytics records are buffered before being dumped to a +// file as write to file should not performed in the Http Thread var internalBuffer chan axRecords -// Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval +// Channel where close bucket event is published i.e. when a bucket +// is ready to be closed based on collection interval var closeBucketEvent chan bucket // Map from timestampt to bucket @@ -36,7 +38,8 @@ } func initBufferingManager() { - internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize)) + internalBuffer = make(chan axRecords, + config.GetInt(analyticsBufferChannelSize)) closeBucketEvent = make(chan bucket) bucketMap = make(map[int64]bucket) @@ -46,7 +49,8 @@ records := <-internalBuffer err := save(records) if err != nil { - log.Errorf("Could not save %d messages to file due to: %v", len(records.Records), err) + log.Errorf("Could not save %d messages to file"+ + " due to: %v", len(records.Records), err) } } }() @@ -55,17 +59,20 @@ go func() { for { bucket := <-closeBucketEvent - log.Debugf("Close Event received for bucket: %s", bucket.DirName) + log.Debugf("Close Event received for bucket: %s", + bucket.DirName) // close open file closeGzipFile(bucket.FileWriter) dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) - // close files in tmp folder and move directory to staging to indicate its ready for upload + // close files in tmp folder and move directory to + // staging to indicate its ready for upload err := os.Rename(dirToBeClosed, stagingPath) if err != nil { - log.Errorf("Cannot move directory '%s' from tmp to staging folder", bucket.DirName) + log.Errorf("Cannot move directory '%s' from"+ + " tmp to staging folder", bucket.DirName) } } }() @@ -82,7 +89,8 @@ } func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) { - // first based on current timestamp and collection interval, determine the timestamp of the bucket + // first based on current timestamp and collection interval, + // determine the timestamp of the bucket ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) _, exists := bucketMap[ts] if exists { @@ -99,12 +107,16 @@ // create dir err := os.Mkdir(newPath, os.ModePerm) if err != nil { - return bucket{}, fmt.Errorf("Cannot create directory '%s' to buffer messages '%v'", dirName, err) + return bucket{}, fmt.Errorf("Cannot create directory "+ + "'%s' to buffer messages '%v'", dirName, err) } // create file for writing // Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz - fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0" + fileExtension + fileName := getRandomHex() + "_" + timestamp + "." + + endtimestamp + "_" + + config.GetString("apigeesync_apid_instance_id") + + "_writer_0" + fileExtension completeFilePath := filepath.Join(newPath, fileName) fw, err := createGzipFile(completeFilePath) if err != nil { @@ -114,7 +126,8 @@ newBucket := bucket{DirName: dirName, FileWriter: fw} bucketMap[ts] = newBucket - //Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file + //Send event to close directory after endTime + 5 + // seconds to make sure all buffers are flushed to file timer := time.After(endTime.Sub(time.Now()) + time.Second*5) go func() { <-timer @@ -124,7 +137,8 @@ } } -// 4 digit Hex is prefixed to each filename to improve how s3 partitions the files being uploaded +// 4 digit Hex is prefixed to each filename to improve +// how s3 partitions the files being uploaded func getRandomHex() string { buff := make([]byte, 2) rand.Read(buff) @@ -134,7 +148,9 @@ func createGzipFile(s string) (fileWriter, error) { file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm) if err != nil { - return fileWriter{}, fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err) + return fileWriter{}, + fmt.Errorf("Cannot create file '%s' "+ + "to buffer messages '%v'", s, err) } gw := gzip.NewWriter(file) bw := bufio.NewWriter(gw)
diff --git a/common_helper.go b/common_helper.go index 31f7d82..9acc033 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -9,16 +9,19 @@ // Cache for scope uuid to org, env and tenantId information var tenantCache map[string]tenant -// RW lock for tenant map cache since the cache can be read while its being written to and vice versa +// RW lock for tenant map cache since the cache can be +// read while its being written to and vice versa var tenantCachelock = sync.RWMutex{} // Cache for apiKey~tenantId to developer related information var developerInfoCache map[string]developerInfo -// RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa +// RW lock for developerInfo map cache since the cache can be +// read while its being written to and vice versa var developerInfoCacheLock = sync.RWMutex{} -// Load data scope information into an in-memory cache so that for each record a DB lookup is not required +// Load data scope information into an in-memory cache so that +// for each record a DB lookup is not required func createTenantCache() error { // Lock before writing to the map as it has multiple readers tenantCachelock.Lock() @@ -30,12 +33,15 @@ rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE") if error != nil { - return fmt.Errorf("Count not get datascope from DB due to: %v", error) + return fmt.Errorf("Count not get datascope from "+ + "DB due to: %v", error) } else { defer rows.Close() for rows.Next() { rows.Scan(&env, &org, &tenantId, &id) - tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId} + tenantCache[id] = tenant{Org: org, + Env: env, + TenantId: tenantId} } } @@ -43,7 +49,8 @@ return nil } -// Load data scope information into an in-memory cache so that for each record a DB lookup is not required +// Load data scope information into an in-memory cache so that +// for each record a DB lookup is not required func createDeveloperInfoCache() error { // Lock before writing to the map as it has multiple readers developerInfoCacheLock.Lock() @@ -53,7 +60,8 @@ var tenantId, apiKey string db := getDB() - sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name, a.name, d.username, d.email " + + sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name," + + " a.name, d.username, d.email " + "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " + "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + "INNER JOIN APP AS a ON a.id = mp.app_id " + @@ -61,11 +69,13 @@ rows, error := db.Query(sSql) if error != nil { - return fmt.Errorf("Count not get developerInfo from DB due to: %v", error) + return fmt.Errorf("Count not get developerInfo "+ + "from DB due to: %v", error) } else { defer rows.Close() for rows.Next() { - rows.Scan(&tenantId, &apiKey, &apiProduct, &developerApp, &developer, &developerEmail) + rows.Scan(&tenantId, &apiKey, &apiProduct, + &developerApp, &developer, &developerEmail) keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) apiPrd := getValuesIgnoringNull(apiProduct) @@ -73,15 +83,21 @@ dev := getValuesIgnoringNull(developer) devEmail := getValuesIgnoringNull(developerEmail) - developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} + developerInfoCache[keyForMap] = developerInfo{ + ApiProduct: apiPrd, + DeveloperApp: devApp, + DeveloperEmail: devEmail, + Developer: dev} } } - log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache)) + log.Debugf("Count of apiKey~tenantId combinations "+ + "in the cache: %d", len(developerInfoCache)) return nil } -// Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config +// Returns Tenant Info given a scope uuid from the cache or by querying +// the DB directly based on useCachig config func getTenantForScope(scopeuuid string) (tenant, dbError) { if config.GetBool(useCaching) { // acquire a read lock as this cache has 1 writer as well @@ -91,9 +107,12 @@ if !exists { reason := "No tenant found for this scopeuuid: " + scopeuuid errorCode := "UNKNOWN_SCOPE" - // Incase of unknown scope, try to refresh the cache ansynchronously incase an update was missed or delayed + // Incase of unknown scope, try to refresh the + // cache ansynchronously incase an update was missed or delayed go createTenantCache() - return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} + return tenant{}, dbError{ + ErrorCode: errorCode, + Reason: reason} } else { return ten, dbError{} } @@ -101,23 +120,32 @@ var org, env, tenantId string db := getDB() - error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE where id = ?", scopeuuid).Scan(&env, &org, &tenantId) + error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE"+ + " where id = ?", scopeuuid).Scan(&env, &org, &tenantId) switch { case error == sql.ErrNoRows: reason := "No tenant found for this scopeuuid: " + scopeuuid errorCode := "UNKNOWN_SCOPE" - return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} + return tenant{}, dbError{ + ErrorCode: errorCode, + Reason: reason} case error != nil: reason := error.Error() errorCode := "INTERNAL_SEARCH_ERROR" - return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} + return tenant{}, dbError{ + ErrorCode: errorCode, + Reason: reason} } - return tenant{Org: org, Env: env, TenantId: tenantId}, dbError{} + return tenant{ + Org: org, + Env: env, + TenantId: tenantId}, dbError{} } } -// Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config +// Returns Dveloper related info given an apiKey and tenantId +// from the cache or by querying the DB directly based on useCachig config func getDeveloperInfo(tenantId string, apiKey string) developerInfo { if config.GetBool(useCaching) { keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) @@ -126,15 +154,18 @@ defer tenantCachelock.RUnlock() devInfo, exists := developerInfoCache[keyForMap] if !exists { - log.Warnf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) - // Incase of unknown apiKey~tenantId, try to refresh the cache ansynchronously incase an update was missed or delayed + log.Warnf("No data found for for tenantId = %s"+ + " and apiKey = %s", tenantId, apiKey) + // Incase of unknown apiKey~tenantId, + // try to refresh the cache ansynchronously incase an update was missed or delayed go createTenantCache() return developerInfo{} } else { return devInfo } } else { - var apiProduct, developerApp, developerEmail, developer sql.NullString + var apiProduct, developerApp, developerEmail sql.NullString + var developer sql.NullString db := getDB() sSql := "SELECT ap.name, a.name, d.username, d.email " + @@ -143,14 +174,18 @@ "INNER JOIN APP AS a ON a.id = mp.app_id " + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " + "where mp.tenant_id = ? and mp.appcred_id = ?;" - error := db.QueryRow(sSql, tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail) + error := db.QueryRow(sSql, tenantId, apiKey). + Scan(&apiProduct, &developerApp, + &developer, &developerEmail) switch { case error == sql.ErrNoRows: - log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) + log.Debugf("No data found for for tenantId = %s "+ + "and apiKey = %s", tenantId, apiKey) return developerInfo{} case error != nil: - log.Debugf("No data found for for tenantId = %s and apiKey = %s due to: %v", tenantId, apiKey, error) + log.Debugf("No data found for for tenantId = %s and "+ + "apiKey = %s due to: %v", tenantId, apiKey, error) return developerInfo{} } @@ -158,7 +193,10 @@ devApp := getValuesIgnoringNull(developerApp) dev := getValuesIgnoringNull(developer) devEmail := getValuesIgnoringNull(developerEmail) - return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} + return developerInfo{ApiProduct: apiPrd, + DeveloperApp: devApp, + DeveloperEmail: devEmail, + Developer: dev} } }
diff --git a/crash_recovery.go b/crash_recovery.go index b34dc9d..ccd3b36 100644 --- a/crash_recovery.go +++ b/crash_recovery.go
@@ -11,15 +11,18 @@ ) const ( - crashRecoveryDelay = 30 // seconds - recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) - recoveredTS = "~recoveredTS~" // Constant to identify recovered files + crashRecoveryDelay = 30 // seconds + // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) + recoveryTSLayout = "20060102150405.000" + // Constant to identify recovered files + recoveredTS = "~recoveredTS~" ) func initCrashRecovery() { if crashRecoveryNeeded() { timer := time.After(time.Second * crashRecoveryDelay) - // Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up + // Actual recovery of files is attempted asynchronously + // after a delay to not block the apid plugin from starting up go func() { <-timer performRecovery() @@ -33,12 +36,14 @@ tmpDirRecoveryNeeded := recoverFoldersInTmpDir() needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded if needed { - log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay) + log.Infof("Crash Recovery is needed and will be "+ + "attempted in %d seconds", crashRecoveryDelay) } return needed } -// If Apid is shutdown or crashes while a file is still open in tmp folder, then the file has partial data. +// If Apid is shutdown or crashes while a file is still open in tmp folder, +// then the file has partial data. // This partial data can be recoverd. func recoverFoldersInTmpDir() bool { tmpRecoveryNeeded := false @@ -46,13 +51,16 @@ recoveryTS := getRecoveryTS() for _, dir := range dirs { tmpRecoveryNeeded = true - log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name()) + log.Debugf("Moving directory '%s' from tmp to"+ + " recovered folder", dir.Name()) tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name()) - newDirName := dir.Name() + recoveredTS + recoveryTS // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 + // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 + newDirName := dir.Name() + recoveredTS + recoveryTS recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName) err := os.Rename(tmpCompletePath, recoveredCompletePath) if err != nil { - log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name()) + log.Errorf("Cannot move directory '%s' "+ + "from tmp to recovered folder", dir.Name()) } } return tmpRecoveryNeeded @@ -64,8 +72,10 @@ return current.Format(recoveryTSLayout) } -// If APID is restarted twice immediately such that files have been moved to recovered folder but actual recovery has'nt started or is partially done -// Then the files will just stay in the recovered dir and need to be recovered again. +// If APID is restarted twice immediately such that files have been moved to +// recovered folder but actual recovery has'nt started or is partially done +// Then the files will just stay in the recovered dir +// and need to be recovered again. func recoverFolderInRecoveredDir() bool { dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) if len(dirs) > 0 { @@ -88,7 +98,8 @@ var bucketRecoveryTS string // Parse bucket name to extract recoveryTS and pass it each file to be recovered - // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 -> bucketRecoveryTS = _20160101222612.123 + // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 + // -> bucketRecoveryTS = _20160101222612.123 index := strings.Index(dirName, recoveredTS) if index != -1 { bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):] @@ -104,7 +115,8 @@ stagingPath := filepath.Join(localAnalyticsStagingDir, dirName) err := os.Rename(dirBeingRecovered, stagingPath) if err != nil { - log.Errorf("Cannot move directory '%s' from recovered to staging folder", dirName) + log.Errorf("Cannot move directory '%s' from"+ + " recovered to staging folder", dirName) } } @@ -123,7 +135,8 @@ deletePartialFile(completeOrigFilePath) } -// The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file. +// The file is read line by line and all complete records are extracted +// and copied to a new file which is closed as a correct gzip file. func copyPartialFile(completeOrigFilePath, recoveredFilePath string) { partialFile, err := os.Open(completeOrigFilePath) if err != nil { @@ -135,7 +148,8 @@ bufReader := bufio.NewReader(partialFile) gzReader, err := gzip.NewReader(bufReader) if err != nil { - log.Errorf("Cannot create reader on gzip file: %s due to %v", completeOrigFilePath, err) + log.Errorf("Cannot create reader on gzip file: %s"+ + " due to %v", completeOrigFilePath, err) return } defer gzReader.Close() @@ -143,7 +157,8 @@ scanner := bufio.NewScanner(gzReader) // Create new file to copy complete records from partial file and upload only a complete file - recoveredFile, err := os.OpenFile(recoveredFilePath, os.O_WRONLY|os.O_CREATE, os.ModePerm) + recoveredFile, err := os.OpenFile(recoveredFilePath, + os.O_WRONLY|os.O_CREATE, os.ModePerm) if err != nil { log.Errorf("Cannot create recovered file: %s", recoveredFilePath) return
diff --git a/init.go b/init.go index e383f16..a3501a4 100644 --- a/init.go +++ b/init.go
@@ -21,18 +21,21 @@ analyticsCollectionInterval = "apidanalytics_collection_interval" analyticsCollectionIntervalDefault = "120" - // Interval in seconds based on which staging directory will be checked for folders ready to be uploaded + // Interval in seconds based on which staging directory + // will be checked for folders ready to be uploaded analyticsUploadInterval = "apidanalytics_upload_interval" analyticsUploadIntervalDefault = "5" - // Number of slots for internal channel buffering of analytics records before they are dumped to a file + // Number of slots for internal channel buffering of + // analytics records before they are dumped to a file analyticsBufferChannelSize = "apidanalytics_buffer_channel_size" analyticsBufferChannelSizeDefault = 100 // EdgeX endpoint base path to access Uap Collection Endpoint uapServerBase = "apidanalytics_uap_server_base" - // If caching is used then data scope and developer info will be maintained in-memory + // If caching is used then data scope and developer + // info will be maintained in-memory // cache to avoid DB calls for each analytics message useCaching = "apidanalytics_use_caching" useCachingDefault = true @@ -90,25 +93,33 @@ for _, key := range []string{uapServerBase} { if !config.IsSet(key) { - return pluginData, fmt.Errorf("Missing required config value: %s", key) + return pluginData, + fmt.Errorf("Missing required config value: %s", key) } } // Create directories for managing buffering and upload to UAP stages - directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} + directories := []string{localAnalyticsBaseDir, + localAnalyticsTempDir, + localAnalyticsStagingDir, + localAnalyticsFailedDir, + localAnalyticsRecoveredDir} err = createDirectories(directories) if err != nil { - return pluginData, fmt.Errorf("Cannot create required local directories: %v ", err) + return pluginData, fmt.Errorf("Cannot create "+ + "required local directories: %v ", err) } // Initialize one time crash recovery to be performed by the plugin on start up initCrashRecovery() - // Initialize upload manager to watch the staging directory and upload files to UAP as they are ready + // Initialize upload manager to watch the staging directory and + // upload files to UAP as they are ready initUploadManager() - // Initialize buffer manager to watch the internalBuffer channel for new messages and dump them to files + // Initialize buffer manager to watch the internalBuffer channel + // for new messages and dump them to files initBufferingManager() // Initialize API's and expose them @@ -129,7 +140,8 @@ } // set local directory paths that will be used to buffer analytics data on disk - localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"), config.GetString(configAnalyticsDataPath)) + localAnalyticsBaseDir = filepath.Join(config.GetString("local_storage_path"), + config.GetString(configAnalyticsDataPath)) localAnalyticsTempDir = filepath.Join(localAnalyticsBaseDir, "tmp") localAnalyticsStagingDir = filepath.Join(localAnalyticsBaseDir, "staging") localAnalyticsFailedDir = filepath.Join(localAnalyticsBaseDir, "failed") @@ -158,7 +170,8 @@ if error != nil { return error } - log.Infof("created directory for analytics data collection: %s", path) + log.Infof("created directory for analytics "+ + "data collection: %s", path) } } return nil
diff --git a/listener.go b/listener.go index f86b2ad..e320fed 100644 --- a/listener.go +++ b/listener.go
@@ -27,7 +27,8 @@ } func processSnapshot(snapshot *common.Snapshot) { - log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) + log.Debugf("Snapshot received. Switching to"+ + " DB version: %s", snapshot.SnapshotInfo) db, err := data.DBVersion(snapshot.SnapshotInfo) if err != nil { @@ -40,7 +41,8 @@ if err != nil { log.Error(err) } else { - log.Debug("Created a local cache for datasope information") + log.Debug("Created a local cache" + + " for datasope information") } err = createDeveloperInfoCache() if err != nil { @@ -49,7 +51,8 @@ log.Debug("Created a local cache for developer information") } } else { - log.Info("Will not be caching any developer info and make a DB call for every analytics msg") + log.Info("Will not be caching any developer info " + + "and make a DB call for every analytics msg") } return } @@ -66,32 +69,44 @@ switch payload.Operation { case common.Insert, common.Update: rows = append(rows, payload.NewRow) - // Lock before writing to the map as it has multiple readers + // Lock before writing to the + // map as it has multiple readers tenantCachelock.Lock() defer tenantCachelock.Unlock() for _, ele := range rows { - var scopeuuid, tenantid, org, env string + var scopeuuid, tenantid string + var org, env string ele.Get("id", &scopeuuid) ele.Get("scope", &tenantid) ele.Get("org", &org) ele.Get("env", &env) - tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid} - log.Debugf("Refreshed local tenantCache. Added scope: %s", scopeuuid) + tenantCache[scopeuuid] = tenant{ + Org: org, + Env: env, + TenantId: tenantid} + log.Debugf("Refreshed local "+ + "tenantCache. Added "+ + "scope: "+"%s", scopeuuid) } case common.Delete: rows = append(rows, payload.OldRow) - // Lock before writing to the map as it has multiple readers + // Lock before writing to the map + // as it has multiple readers tenantCachelock.Lock() defer tenantCachelock.Unlock() for _, ele := range rows { var scopeuuid string ele.Get("id", &scopeuuid) delete(tenantCache, scopeuuid) - log.Debugf("Refreshed local tenantCache. Deleted scope: %s", scopeuuid) + log.Debugf("Refreshed local"+ + " tenantCache. Deleted"+ + " scope: %s", scopeuuid) } } - case "kms.developer", "kms.app", "kms.api_product", "kms.app_credential_apiproduct_mapper": - // any change in any of the above tables should result in cache refresh + case "kms.developer", "kms.app", "kms.api_product", + "kms.app_credential_apiproduct_mapper": + // any change in any of the above tables + // should result in cache refresh createDeveloperInfoCache() log.Debug("Refresh local developerInfoCache") }
diff --git a/upload_manager.go b/upload_manager.go index 1ae94a5..a95ce87 100644 --- a/upload_manager.go +++ b/upload_manager.go
@@ -12,25 +12,32 @@ retryFailedDirBatchSize = 10 ) -// Each file upload is retried maxRetries times before moving it to failed directory +// Each file upload is retried maxRetries times before +// moving it to failed directory var retriesMap map[string]int -//TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running +//TODO: make sure that this instance gets initialized only once +// since we dont want multiple upload manager tickers running func initUploadManager() { retriesMap = make(map[string]int) go func() { - // Periodically check the staging directory to check if any folders are ready to be uploaded to S3 - ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval)) + // Periodically check the staging directory to check + // if any folders are ready to be uploaded to S3 + ticker := time.NewTicker(time.Second * + config.GetDuration(analyticsUploadInterval)) log.Debugf("Intialized upload manager to check for staging directory") - defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running + // Ticker will keep running till go routine is running + // i.e. till application is running + defer ticker.Stop() for range ticker.C { files, err := ioutil.ReadDir(localAnalyticsStagingDir) if err != nil { - log.Errorf("Cannot read directory: %s", localAnalyticsStagingDir) + log.Errorf("Cannot read directory: "+ + "%s", localAnalyticsStagingDir) } uploadedDirCnt := 0 @@ -44,7 +51,8 @@ } } if uploadedDirCnt > 0 { - // After a successful upload, retry the folders in failed directory as they might have + // After a successful upload, retry the + // folders in failed directory as they might have // failed due to intermitent S3/GCS issue retryFailedUploads() } @@ -54,10 +62,12 @@ func handleUploadDirStatus(dir os.FileInfo, status bool) { completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) - // If upload is successful then delete files and remove bucket from retry map + // If upload is successful then delete files + // and remove bucket from retry map if status { os.RemoveAll(completePath) - log.Debugf("deleted directory after successful upload: %s", dir.Name()) + log.Debugf("deleted directory after "+ + "successful upload: %s", dir.Name()) // remove key if exists from retry map after a successful upload delete(retriesMap, dir.Name()) } else { @@ -67,7 +77,8 @@ failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name()) err := os.Rename(completePath, failedDirPath) if err != nil { - log.Errorf("Cannot move directory '%s' from staging to failed folder", dir.Name()) + log.Errorf("Cannot move directory '%s'"+ + " from staging to failed folder", dir.Name()) } // remove key from retry map once it reaches allowed max failed attempts delete(retriesMap, dir.Name()) @@ -90,7 +101,8 @@ newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name()) err := os.Rename(failedPath, newStagingPath) if err != nil { - log.Errorf("Cannot move directory '%s' from failed to staging folder", dir.Name()) + log.Errorf("Cannot move directory '%s'"+ + " from failed to staging folder", dir.Name()) } } else { break
diff --git a/uploader.go b/uploader.go index a8292a3..fb8a267 100644 --- a/uploader.go +++ b/uploader.go
@@ -16,7 +16,8 @@ var token string var client *http.Client = &http.Client{ - Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS + //set default timeout of 60 seconds while connecting to s3/GCS + Timeout: time.Duration(60 * time.Second), } func addHeaders(req *http.Request) { @@ -27,7 +28,8 @@ func uploadDir(dir os.FileInfo) bool { // Eg. org~env~20160101224500 tenant, timestamp := splitDirName(dir.Name()) - dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45 + //date=2016-01-01/time=22-45 + dateTimePartition := getDateFromDirTimestamp(timestamp) completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) files, _ := ioutil.ReadDir(completePath) @@ -43,7 +45,8 @@ break } else { os.Remove(completeFilePath) - log.Debugf("Deleted file '%s' after successful upload", file.Name()) + log.Debugf("Deleted file '%s' after "+ + "successful upload", file.Name()) } } return status @@ -90,7 +93,8 @@ signedURL := body["url"] return signedURL.(string), nil } else { - return "", fmt.Errorf("Error while getting signed URL '%v'", resp.Status) + return "", fmt.Errorf("Error while getting "+ + "signed URL '%v'", resp.Status) } } @@ -112,7 +116,8 @@ fileStats, err := file.Stat() if err != nil { - return false, fmt.Errorf("Could not get content length for file '%v'", err) + return false, fmt.Errorf("Could not get content length for "+ + "file '%v'", err) } req.ContentLength = fileStats.Size() @@ -125,7 +130,8 @@ if resp.StatusCode == 200 { return true, nil } else { - return false, fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status) + return false, fmt.Errorf("Final Datastore (S3/GCS)returned "+ + "Error '%v'", resp.Status) } } @@ -137,7 +143,8 @@ return tenant, timestamp } -// files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin +// files are uploaded to S3 under specific date time partition and that +// key needs to be generated from the plugin // eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz func getDateFromDirTimestamp(timestamp string) string { dateTime, _ := time.Parse(timestampLayout, timestamp)