go fmt to fix formatting
diff --git a/api.go b/api.go
index c2edcb5..25cbbf2 100644
--- a/api.go
+++ b/api.go
@@ -21,21 +21,21 @@
func initAPI(services apid.Services) {
log.Debug("initialized API's exposed by apidAnalytics plugin")
analyticsBasePath = config.GetString(configAnalyticsBasePath)
- services.API().HandleFunc(analyticsBasePath + "/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST")
+ services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST")
}
func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
- db := getDB() // When database isnt initialized
+ db := getDB() // When database isnt initialized
if db == nil {
- writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely")
+ writeError(w, http.StatusInternalServerError, "INTERNAL_SERVER_ERROR", "Service is not initialized completely")
return
}
- if !strings.EqualFold(r.Header.Get("Content-Type"),"application/json") {
- writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE","Only supported content type is application/json")
+ if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") {
+ writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE", "Only supported content type is application/json")
return
}
@@ -50,7 +50,7 @@
writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason)
}
} else {
- err := processPayload(tenant, scopeuuid, r);
+ err := processPayload(tenant, scopeuuid, r)
if err.ErrorCode == "" {
w.WriteHeader(http.StatusOK)
} else {
diff --git a/api_helper.go b/api_helper.go
index f12605f..34ca14e 100644
--- a/api_helper.go
+++ b/api_helper.go
@@ -1,42 +1,41 @@
package apidAnalytics
import (
- "encoding/json"
- "net/http"
- "io"
- "strings"
"compress/gzip"
+ "encoding/json"
+ "io"
+ "net/http"
+ "strings"
)
-
/*
Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel
*/
type developerInfo struct {
- ApiProduct string
- DeveloperApp string
- DeveloperEmail string
- Developer string
+ ApiProduct string
+ DeveloperApp string
+ DeveloperEmail string
+ Developer string
}
type axRecords struct {
- Tenant tenant
- Records []interface{} // Records is an array of multiple analytics records
+ Tenant tenant
+ Records []interface{} // Records is an array of multiple analytics records
}
type tenant struct {
- Org string
- Env string
+ Org string
+ Env string
TenantId string
}
-func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
+func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse {
var gzipEncoded bool
if r.Header.Get("Content-Encoding") != "" {
- if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") {
- return errResponse{ErrorCode:"UNSUPPORTED_CONTENT_ENCODING", Reason:"Only supported content encoding is gzip"}
- } else {
+ if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") {
+ return errResponse{ErrorCode: "UNSUPPORTED_CONTENT_ENCODING", Reason: "Only supported content encoding is gzip"}
+ } else {
gzipEncoded = true
}
}
@@ -44,9 +43,9 @@
var reader io.ReadCloser
var err error
if gzipEncoded {
- reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
+ reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data
if err != nil {
- return errResponse{ErrorCode:"BAD_DATA", Reason:"Gzip Encoded data cannot be read"}
+ return errResponse{ErrorCode: "BAD_DATA", Reason: "Gzip Encoded data cannot be read"}
}
} else {
reader = r.Body
@@ -61,11 +60,11 @@
func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse {
var raw map[string]interface{}
- decoder := json.NewDecoder(reader) // Decode payload to JSON data
+ decoder := json.NewDecoder(reader) // Decode payload to JSON data
decoder.UseNumber()
if err := decoder.Decode(&raw); err != nil {
- return errResponse{ErrorCode:"BAD_DATA", Reason:"Not a valid JSON payload"}
+ return errResponse{ErrorCode: "BAD_DATA", Reason: "Not a valid JSON payload"}
}
if records := raw["records"]; records != nil {
@@ -76,14 +75,14 @@
if valid {
enrich(recordMap, scopeuuid, tenant)
} else {
- return err // Even if there is one bad record, then reject entire batch
+ return err // Even if there is one bad record, then reject entire batch
}
}
axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})}
// publish batch of records to channel (blocking call)
internalBuffer <- axRecords
} else {
- return errResponse{ErrorCode:"NO_RECORDS", Reason:"No analytics records in the payload"}
+ return errResponse{ErrorCode: "NO_RECORDS", Reason: "No analytics records in the payload"}
}
return errResponse{}
}
@@ -97,7 +96,7 @@
elems := []string{"client_received_start_timestamp"}
for _, elem := range elems {
if recordMap[elem] == nil {
- return false, errResponse{ErrorCode:"MISSING_FIELD", Reason:"Missing Required field: " + elem}
+ return false, errResponse{ErrorCode: "MISSING_FIELD", Reason: "Missing Required field: " + elem}
}
}
@@ -105,7 +104,7 @@
cret, exists2 := recordMap["client_received_end_timestamp"]
if exists1 && exists2 {
if crst.(json.Number) > cret.(json.Number) {
- return false, errResponse{ErrorCode:"BAD_DATA", Reason:"client_received_start_timestamp > client_received_end_timestamp"}
+ return false, errResponse{ErrorCode: "BAD_DATA", Reason: "client_received_start_timestamp > client_received_end_timestamp"}
}
}
return true, errResponse{}
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go
index 2e98e86..247212f 100644
--- a/apidAnalytics_suite_test.go
+++ b/apidAnalytics_suite_test.go
@@ -33,8 +33,8 @@
Expect(err).NotTo(HaveOccurred())
config.Set("data_path", testTempDir)
- config.Set(uapServerBase, "http://localhost:9000") // dummy value
- config.Set("apigeesync_apid_instance_id","abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
+ config.Set(uapServerBase, "http://localhost:9000") // dummy value
+ config.Set("apigeesync_apid_instance_id", "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value
config.Set(useCaching, true)
db, err := apid.Data().DB()
@@ -125,7 +125,7 @@
);
`)
if err != nil {
- panic("Unable to initialize DB " + err.Error())
+ panic("Unable to initialize DB " + err.Error())
}
}
@@ -161,7 +161,7 @@
);
`)
if err != nil {
- panic("Unable to initialize DB " + err.Error())
+ panic("Unable to initialize DB " + err.Error())
}
}
@@ -170,7 +170,7 @@
txn, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+
- "VALUES" +
+ "VALUES"+
"($1,$2,$3,$4,$5,$6)",
"tenantid",
"testapikey",
@@ -178,33 +178,33 @@
"testproductid",
"APPROVED",
"12345",
- );
+ )
txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+
- "VALUES" +
+ "VALUES"+
"($1,$2,$3,$4)",
"testappid",
"tenantid",
"testapp",
"testdeveloperid",
- );
+ )
txn.Exec("INSERT INTO API_PRODUCT (id, tenant_id, name) "+
- "VALUES" +
+ "VALUES"+
"($1,$2,$3)",
"testproductid",
"tenantid",
"testproduct",
- );
+ )
txn.Exec("INSERT INTO DEVELOPER (id, tenant_id, username, email) "+
- "VALUES" +
+ "VALUES"+
"($1,$2,$3,$4)",
"testdeveloperid",
"tenantid",
"testdeveloper",
"testdeveloper@test.com",
- );
+ )
txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+
"VALUES"+
@@ -215,11 +215,10 @@
"tenantid",
"testorg",
"testenv",
- );
+ )
txn.Commit()
}
-
var _ = AfterSuite(func() {
apid.Events().Close()
if testServer != nil {
diff --git a/buffering_manager.go b/buffering_manager.go
index b9fa3d2..6ec1ec1 100644
--- a/buffering_manager.go
+++ b/buffering_manager.go
@@ -1,22 +1,24 @@
package apidAnalytics
import (
- "time"
- "os"
"bufio"
"compress/gzip"
- "path/filepath"
- "fmt"
"crypto/rand"
"encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
)
-const fileExtension = ".txt.gz";
+const fileExtension = ".txt.gz"
// Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread
var internalBuffer chan axRecords
+
// Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval
var closeBucketEvent chan bucket
+
// Map from timestampt to bucket
var bucketMap map[int64]bucket
@@ -28,19 +30,19 @@
// This struct will store open file handle and writer to close the file
type fileWriter struct {
- file *os.File
- gw *gzip.Writer
- bw *bufio.Writer
+ file *os.File
+ gw *gzip.Writer
+ bw *bufio.Writer
}
func initBufferingManager() {
internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize))
- closeBucketEvent = make(chan bucket)
+ closeBucketEvent = make(chan bucket)
bucketMap = make(map[int64]bucket)
// Keep polling the internal buffer for new messages
go func() {
- for {
+ for {
records := <-internalBuffer
err := save(records)
if err != nil {
@@ -51,8 +53,8 @@
// Keep polling the closeEvent channel to see if bucket is ready to be closed
go func() {
- for {
- bucket := <- closeBucketEvent
+ for {
+ bucket := <-closeBucketEvent
log.Debugf("Close Event received for bucket: %s", bucket.DirName)
// close open file
@@ -70,19 +72,18 @@
}
// Save records to correct file based on what timestamp data is being collected for
-func save(records axRecords) (error) {
+func save(records axRecords) error {
bucket, err := getBucketForTimestamp(time.Now(), records.Tenant)
- if (err != nil ) {
+ if err != nil {
return err
}
writeGzipFile(bucket.FileWriter, records.Records)
return nil
}
-
func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) {
// first based on current timestamp and collection interval, determine the timestamp of the bucket
- ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
+ ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval))
_, exists := bucketMap[ts]
if exists {
return bucketMap[ts], nil
@@ -90,7 +91,7 @@
timestamp := time.Unix(ts, 0).Format(timestampLayout)
// endtimestamp of bucket = starttimestamp + collectionInterval
- endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0)
+ endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0)
endtimestamp := endTime.Format(timestampLayout)
dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp
@@ -114,9 +115,9 @@
bucketMap[ts] = newBucket
//Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file
- timer := time.After(endTime.Sub(time.Now()) + time.Second * 5)
+ timer := time.After(endTime.Sub(time.Now()) + time.Second*5)
go func() {
- <- timer
+ <-timer
closeBucketEvent <- newBucket
}()
return newBucket, nil
@@ -133,7 +134,7 @@
func createGzipFile(s string) (fileWriter, error) {
file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
- return fileWriter{},fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err)
+ return fileWriter{}, fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err)
}
gw := gzip.NewWriter(file)
bw := bufio.NewWriter(gw)
@@ -160,4 +161,3 @@
fw.gw.Close()
fw.file.Close()
}
-
diff --git a/common_helper.go b/common_helper.go
index 89b37bc..d4e26d5 100644
--- a/common_helper.go
+++ b/common_helper.go
@@ -8,10 +8,13 @@
// Cache for scope uuid to org, env and tenantId information
var tenantCache map[string]tenant
+
// RW lock for tenant map cache since the cache can be read while its being written to and vice versa
var tenantCachelock = sync.RWMutex{}
+
// Cache for apiKey~tenantId to developer related information
var developerInfoCache map[string]developerInfo
+
// RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa
var developerInfoCacheLock = sync.RWMutex{}
@@ -25,13 +28,13 @@
if error != nil {
return fmt.Errorf("Count not get datascope from DB due to: %v", error)
- } else {
+ } else {
defer rows.Close()
// Lock before writing to the map as it has multiple readers
tenantCachelock.Lock()
defer tenantCachelock.Unlock()
for rows.Next() {
- rows.Scan(&env, &org, &tenantId, &id);
+ rows.Scan(&env, &org, &tenantId, &id)
tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId}
}
}
@@ -43,7 +46,7 @@
// Load data scope information into an in-memory cache so that for each record a DB lookup is not required
func createDeveloperInfoCache() error {
developerInfoCache = make(map[string]developerInfo)
- var apiProduct, developerApp, developerEmail, developer sql.NullString
+ var apiProduct, developerApp, developerEmail, developer sql.NullString
var tenantId, apiKey string
db := getDB()
@@ -62,7 +65,7 @@
developerInfoCacheLock.Lock()
defer developerInfoCacheLock.Unlock()
for rows.Next() {
- rows.Scan(&tenantId,&apiKey,&apiProduct, &developerApp, &developer, &developerEmail)
+ rows.Scan(&tenantId, &apiKey, &apiProduct, &developerApp, &developer, &developerEmail)
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
apiPrd := getValuesIgnoringNull(apiProduct)
@@ -80,7 +83,7 @@
// Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config
func getTenantForScope(scopeuuid string) (tenant, dbError) {
- if (config.GetBool(useCaching)) {
+ if config.GetBool(useCaching) {
_, exists := tenantCache[scopeuuid]
if !exists {
reason := "No tenant found for this scopeuuid: " + scopeuuid
@@ -110,13 +113,13 @@
errorCode := "INTERNAL_SEARCH_ERROR"
return tenant{}, dbError{ErrorCode: errorCode, Reason: reason}
}
- return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{}
+ return tenant{Org: org, Env: env, TenantId: tenantId}, dbError{}
}
}
// Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config
func getDeveloperInfo(tenantId string, apiKey string) developerInfo {
- if (config.GetBool(useCaching)) {
+ if config.GetBool(useCaching) {
keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey)
_, exists := developerInfoCache[keyForMap]
if !exists {
@@ -131,7 +134,7 @@
return developerInfoCache[keyForMap]
}
} else {
- var apiProduct, developerApp, developerEmail, developer sql.NullString
+ var apiProduct, developerApp, developerEmail, developer sql.NullString
db := getDB()
sSql := "SELECT ap.name, a.name, d.username, d.email " +
diff --git a/common_helper_test.go b/common_helper_test.go
index 126602d..c691d52 100644
--- a/common_helper_test.go
+++ b/common_helper_test.go
@@ -1,6 +1,5 @@
package apidAnalytics
-
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@@ -29,7 +28,7 @@
var _ = Describe("test getDeveloperInfo()", func() {
Context("get developerInfo for valid tenantId and apikey", func() {
It("should return all right data", func() {
- developerInfo := getDeveloperInfo("tenantid","testapikey")
+ developerInfo := getDeveloperInfo("tenantid", "testapikey")
Expect(developerInfo.ApiProduct).To(Equal("testproduct"))
Expect(developerInfo.Developer).To(Equal("testdeveloper"))
Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com"))
@@ -39,11 +38,11 @@
Context("get developerInfo for invalid tenantId and apikey", func() {
It("should return all right data", func() {
- developerInfo := getDeveloperInfo("wrongid","wrongapikey")
+ developerInfo := getDeveloperInfo("wrongid", "wrongapikey")
Expect(developerInfo.ApiProduct).To(Equal(""))
Expect(developerInfo.Developer).To(Equal(""))
Expect(developerInfo.DeveloperEmail).To(Equal(""))
Expect(developerInfo.DeveloperApp).To(Equal(""))
})
})
-})
\ No newline at end of file
+})
diff --git a/crash_recovery.go b/crash_recovery.go
index 7811d9c..b34dc9d 100644
--- a/crash_recovery.go
+++ b/crash_recovery.go
@@ -1,20 +1,19 @@
package apidAnalytics
import (
- "time"
- "io/ioutil"
- "path/filepath"
"bufio"
- "os"
- "strings"
"compress/gzip"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
)
-
const (
- crashRecoveryDelay = 30 // seconds
- recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
- recoveredTS = "~recoveredTS~" // Constant to identify recovered files
+ crashRecoveryDelay = 30 // seconds
+ recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file)
+ recoveredTS = "~recoveredTS~" // Constant to identify recovered files
)
func initCrashRecovery() {
@@ -22,16 +21,16 @@
timer := time.After(time.Second * crashRecoveryDelay)
// Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up
go func() {
- <- timer
+ <-timer
performRecovery()
}()
}
}
// Crash recovery is needed if there are any folders in tmp or recovered directory
-func crashRecoveryNeeded() (bool) {
+func crashRecoveryNeeded() bool {
recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir()
- tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
+ tmpDirRecoveryNeeded := recoverFoldersInTmpDir()
needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded
if needed {
log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay)
@@ -43,14 +42,14 @@
// This partial data can be recoverd.
func recoverFoldersInTmpDir() bool {
tmpRecoveryNeeded := false
- dirs,_ := ioutil.ReadDir(localAnalyticsTempDir)
+ dirs, _ := ioutil.ReadDir(localAnalyticsTempDir)
recoveryTS := getRecoveryTS()
for _, dir := range dirs {
tmpRecoveryNeeded = true
log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name())
tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name())
- newDirName := dir.Name() + recoveredTS + recoveryTS; // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
- recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName)
+ newDirName := dir.Name() + recoveredTS + recoveryTS // Eg. org~env~20160101222400~recoveredTS~20160101222612.123
+ recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName)
err := os.Rename(tmpCompletePath, recoveredCompletePath)
if err != nil {
log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name())
@@ -75,17 +74,17 @@
return false
}
-func performRecovery() {
- log.Info("Crash recovery is starting...");
+func performRecovery() {
+ log.Info("Crash recovery is starting...")
recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir)
for _, dir := range recoveryDirs {
- recoverDirectory(dir.Name());
+ recoverDirectory(dir.Name())
}
- log.Info("Crash recovery complete...");
+ log.Info("Crash recovery complete...")
}
func recoverDirectory(dirName string) {
- log.Infof("performing crash recovery for directory: %s", dirName);
+ log.Infof("performing crash recovery for directory: %s", dirName)
var bucketRecoveryTS string
// Parse bucket name to extract recoveryTS and pass it each file to be recovered
@@ -99,7 +98,7 @@
files, _ := ioutil.ReadDir(dirBeingRecovered)
for _, file := range files {
// recovering each file sequentially for now
- recoverFile(bucketRecoveryTS, dirName, file.Name());
+ recoverFile(bucketRecoveryTS, dirName, file.Name())
}
stagingPath := filepath.Join(localAnalyticsStagingDir, dirName)
@@ -120,8 +119,8 @@
recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName)
// Copy complete records to new file and delete original partial file
- copyPartialFile(completeOrigFilePath, recoveredFilePath);
- deletePartialFile(completeOrigFilePath);
+ copyPartialFile(completeOrigFilePath, recoveredFilePath)
+ deletePartialFile(completeOrigFilePath)
}
// The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file.
@@ -172,4 +171,4 @@
if err != nil {
log.Errorf("Cannot delete partial file: %s", completeOrigFilePath)
}
-}
\ No newline at end of file
+}
diff --git a/init.go b/init.go
index 40949bd..e383f16 100644
--- a/init.go
+++ b/init.go
@@ -4,8 +4,8 @@
"fmt"
"github.com/30x/apid"
"os"
- "sync"
"path/filepath"
+ "sync"
)
const (
@@ -26,7 +26,7 @@
analyticsUploadIntervalDefault = "5"
// Number of slots for internal channel buffering of analytics records before they are dumped to a file
- analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
+ analyticsBufferChannelSize = "apidanalytics_buffer_channel_size"
analyticsBufferChannelSizeDefault = 100
// EdgeX endpoint base path to access Uap Collection Endpoint
@@ -34,16 +34,16 @@
// If caching is used then data scope and developer info will be maintained in-memory
// cache to avoid DB calls for each analytics message
- useCaching = "apidanalytics_use_caching"
+ useCaching = "apidanalytics_use_caching"
useCachingDefault = true
)
// keep track of the services that this plugin will use
var (
- log apid.LogService
- config apid.ConfigService
- data apid.DataService
- events apid.EventsService
+ log apid.LogService
+ config apid.ConfigService
+ data apid.DataService
+ events apid.EventsService
unsafeDB apid.DB
dbMux sync.RWMutex
diff --git a/listener.go b/listener.go
index 846a05e..53d4bdd 100644
--- a/listener.go
+++ b/listener.go
@@ -5,7 +5,7 @@
"github.com/apigee-labs/transicator/common"
)
-type handler struct {}
+type handler struct{}
func (h *handler) String() string {
return "apigeeAnalytics"
@@ -35,7 +35,7 @@
}
setDB(db)
- if (config.GetBool(useCaching)) {
+ if config.GetBool(useCaching) {
err = createTenantCache()
if err != nil {
log.Error(err)
@@ -55,7 +55,7 @@
}
func processChange(changes *common.ChangeList) {
- if (config.GetBool(useCaching)) {
+ if config.GetBool(useCaching) {
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
var rows []common.Row
diff --git a/upload_manager.go b/upload_manager.go
index 1cc3bc2..1ae94a5 100644
--- a/upload_manager.go
+++ b/upload_manager.go
@@ -8,7 +8,7 @@
)
const (
- maxRetries = 3
+ maxRetries = 3
retryFailedDirBatchSize = 10
)
@@ -96,4 +96,4 @@
break
}
}
-}
\ No newline at end of file
+}
diff --git a/uploader.go b/uploader.go
index d8be63c..a8292a3 100644
--- a/uploader.go
+++ b/uploader.go
@@ -1,33 +1,33 @@
package apidAnalytics
import (
- "os"
"encoding/json"
- "strings"
- "path/filepath"
+ "fmt"
"io/ioutil"
"net/http"
- "fmt"
+ "os"
+ "path/filepath"
+ "strings"
"time"
)
-const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
+const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss
var token string
var client *http.Client = &http.Client{
- Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS
- }
+ Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS
+}
func addHeaders(req *http.Request) {
token = config.GetString("apigeesync_bearer_token")
- req.Header.Add("Authorization", "Bearer " + token)
+ req.Header.Add("Authorization", "Bearer "+token)
}
func uploadDir(dir os.FileInfo) bool {
// Eg. org~env~20160101224500
tenant, timestamp := splitDirName(dir.Name())
- dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45
+ dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45
completePath := filepath.Join(localAnalyticsStagingDir, dir.Name())
files, _ := ioutil.ReadDir(completePath)
@@ -36,8 +36,8 @@
var error error
for _, file := range files {
completeFilePath := filepath.Join(completePath, file.Name())
- relativeFilePath := dateTimePartition + "/" + file.Name();
- status, error = uploadFile(tenant,relativeFilePath, completeFilePath)
+ relativeFilePath := dateTimePartition + "/" + file.Name()
+ status, error = uploadFile(tenant, relativeFilePath, completeFilePath)
if error != nil {
log.Errorf("Upload failed due to: %v", error)
break
@@ -51,7 +51,7 @@
func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) {
signedUrl, err := getSignedUrl(tenant, relativeFilePath)
- if (err != nil) {
+ if err != nil {
return false, err
} else {
return uploadFileToDatastore(completeFilePath, signedUrl)
@@ -84,13 +84,13 @@
defer resp.Body.Close()
respBody, _ := ioutil.ReadAll(resp.Body)
- if(resp.StatusCode == 200) {
+ if resp.StatusCode == 200 {
var body map[string]interface{}
json.Unmarshal(respBody, &body)
- signedURL := body["url"]
+ signedURL := body["url"]
return signedURL.(string), nil
} else {
- return "", fmt.Errorf("Error while getting signed URL '%v'",resp.Status)
+ return "", fmt.Errorf("Error while getting signed URL '%v'", resp.Status)
}
}
@@ -122,27 +122,27 @@
}
defer resp.Body.Close()
- if(resp.StatusCode == 200) {
+ if resp.StatusCode == 200 {
return true, nil
} else {
- return false,fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status)
+ return false, fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status)
}
}
// Extract tenant and timestamp from directory Name
-func splitDirName(dirName string) (string, string){
+func splitDirName(dirName string) (string, string) {
s := strings.Split(dirName, "~")
- tenant := s[0]+"~"+s[1]
+ tenant := s[0] + "~" + s[1]
timestamp := s[2]
- return tenant, timestamp
+ return tenant, timestamp
}
// files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin
// eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz
-func getDateFromDirTimestamp(timestamp string) (string){
+func getDateFromDirTimestamp(timestamp string) string {
dateTime, _ := time.Parse(timestampLayout, timestamp)
- date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd
- time := dateTime.Format("15-04") // same as HH-mm
- dateHourTS := "date=" + date + "/time=" + time
+ date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd
+ time := dateTime.Format("15-04") // same as HH-mm
+ dateHourTS := "date=" + date + "/time=" + time
return dateHourTS
-}
\ No newline at end of file
+}