go fmt to fix formatting
diff --git a/api.go b/api.go index c2edcb5..25cbbf2 100644 --- a/api.go +++ b/api.go
@@ -21,21 +21,21 @@ func initAPI(services apid.Services) { log.Debug("initialized API's exposed by apidAnalytics plugin") analyticsBasePath = config.GetString(configAnalyticsBasePath) - services.API().HandleFunc(analyticsBasePath + "/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST") + services.API().HandleFunc(analyticsBasePath+"/{bundle_scope_uuid}", saveAnalyticsRecord).Methods("POST") } func saveAnalyticsRecord(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=UTF-8") - db := getDB() // When database isnt initialized + db := getDB() // When database isnt initialized if db == nil { - writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") + writeError(w, http.StatusInternalServerError, "INTERNAL_SERVER_ERROR", "Service is not initialized completely") return } - if !strings.EqualFold(r.Header.Get("Content-Type"),"application/json") { - writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE","Only supported content type is application/json") + if !strings.EqualFold(r.Header.Get("Content-Type"), "application/json") { + writeError(w, http.StatusBadRequest, "UNSUPPORTED_CONTENT_TYPE", "Only supported content type is application/json") return } @@ -50,7 +50,7 @@ writeError(w, http.StatusBadRequest, "UNKNOWN_SCOPE", dbErr.Reason) } } else { - err := processPayload(tenant, scopeuuid, r); + err := processPayload(tenant, scopeuuid, r) if err.ErrorCode == "" { w.WriteHeader(http.StatusOK) } else {
diff --git a/api_helper.go b/api_helper.go index f12605f..34ca14e 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -1,42 +1,41 @@ package apidAnalytics import ( - "encoding/json" - "net/http" - "io" - "strings" "compress/gzip" + "encoding/json" + "io" + "net/http" + "strings" ) - /* Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel */ type developerInfo struct { - ApiProduct string - DeveloperApp string - DeveloperEmail string - Developer string + ApiProduct string + DeveloperApp string + DeveloperEmail string + Developer string } type axRecords struct { - Tenant tenant - Records []interface{} // Records is an array of multiple analytics records + Tenant tenant + Records []interface{} // Records is an array of multiple analytics records } type tenant struct { - Org string - Env string + Org string + Env string TenantId string } -func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { +func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { var gzipEncoded bool if r.Header.Get("Content-Encoding") != "" { - if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") { - return errResponse{ErrorCode:"UNSUPPORTED_CONTENT_ENCODING", Reason:"Only supported content encoding is gzip"} - } else { + if !strings.EqualFold(r.Header.Get("Content-Encoding"), "gzip") { + return errResponse{ErrorCode: "UNSUPPORTED_CONTENT_ENCODING", Reason: "Only supported content encoding is gzip"} + } else { gzipEncoded = true } } @@ -44,9 +43,9 @@ var reader io.ReadCloser var err error if gzipEncoded { - reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data + reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data if err != nil { - return errResponse{ErrorCode:"BAD_DATA", Reason:"Gzip Encoded data cannot be read"} + return errResponse{ErrorCode: "BAD_DATA", Reason: "Gzip Encoded data cannot be read"} } } else { reader = r.Body @@ -61,11 +60,11 @@ func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse { var raw map[string]interface{} - decoder := json.NewDecoder(reader) // Decode payload to JSON data + decoder := json.NewDecoder(reader) // Decode payload to JSON data decoder.UseNumber() if err := decoder.Decode(&raw); err != nil { - return errResponse{ErrorCode:"BAD_DATA", Reason:"Not a valid JSON payload"} + return errResponse{ErrorCode: "BAD_DATA", Reason: "Not a valid JSON payload"} } if records := raw["records"]; records != nil { @@ -76,14 +75,14 @@ if valid { enrich(recordMap, scopeuuid, tenant) } else { - return err // Even if there is one bad record, then reject entire batch + return err // Even if there is one bad record, then reject entire batch } } axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})} // publish batch of records to channel (blocking call) internalBuffer <- axRecords } else { - return errResponse{ErrorCode:"NO_RECORDS", Reason:"No analytics records in the payload"} + return errResponse{ErrorCode: "NO_RECORDS", Reason: "No analytics records in the payload"} } return errResponse{} } @@ -97,7 +96,7 @@ elems := []string{"client_received_start_timestamp"} for _, elem := range elems { if recordMap[elem] == nil { - return false, errResponse{ErrorCode:"MISSING_FIELD", Reason:"Missing Required field: " + elem} + return false, errResponse{ErrorCode: "MISSING_FIELD", Reason: "Missing Required field: " + elem} } } @@ -105,7 +104,7 @@ cret, exists2 := recordMap["client_received_end_timestamp"] if exists1 && exists2 { if crst.(json.Number) > cret.(json.Number) { - return false, errResponse{ErrorCode:"BAD_DATA", Reason:"client_received_start_timestamp > client_received_end_timestamp"} + return false, errResponse{ErrorCode: "BAD_DATA", Reason: "client_received_start_timestamp > client_received_end_timestamp"} } } return true, errResponse{}
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go index 2e98e86..247212f 100644 --- a/apidAnalytics_suite_test.go +++ b/apidAnalytics_suite_test.go
@@ -33,8 +33,8 @@ Expect(err).NotTo(HaveOccurred()) config.Set("data_path", testTempDir) - config.Set(uapServerBase, "http://localhost:9000") // dummy value - config.Set("apigeesync_apid_instance_id","abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value + config.Set(uapServerBase, "http://localhost:9000") // dummy value + config.Set("apigeesync_apid_instance_id", "abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value config.Set(useCaching, true) db, err := apid.Data().DB() @@ -125,7 +125,7 @@ ); `) if err != nil { - panic("Unable to initialize DB " + err.Error()) + panic("Unable to initialize DB " + err.Error()) } } @@ -161,7 +161,7 @@ ); `) if err != nil { - panic("Unable to initialize DB " + err.Error()) + panic("Unable to initialize DB " + err.Error()) } } @@ -170,7 +170,7 @@ txn, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) txn.Exec("INSERT INTO APP_CREDENTIAL_APIPRODUCT_MAPPER (tenant_id, appcred_id, app_id, apiprdt_id, status, _change_selector) "+ - "VALUES" + + "VALUES"+ "($1,$2,$3,$4,$5,$6)", "tenantid", "testapikey", @@ -178,33 +178,33 @@ "testproductid", "APPROVED", "12345", - ); + ) txn.Exec("INSERT INTO APP (id, tenant_id, name, developer_id) "+ - "VALUES" + + "VALUES"+ "($1,$2,$3,$4)", "testappid", "tenantid", "testapp", "testdeveloperid", - ); + ) txn.Exec("INSERT INTO API_PRODUCT (id, tenant_id, name) "+ - "VALUES" + + "VALUES"+ "($1,$2,$3)", "testproductid", "tenantid", "testproduct", - ); + ) txn.Exec("INSERT INTO DEVELOPER (id, tenant_id, username, email) "+ - "VALUES" + + "VALUES"+ "($1,$2,$3,$4)", "testdeveloperid", "tenantid", "testdeveloper", "testdeveloper@test.com", - ); + ) txn.Exec("INSERT INTO DATA_SCOPE (id, _change_selector, apid_cluster_id, scope, org, env) "+ "VALUES"+ @@ -215,11 +215,10 @@ "tenantid", "testorg", "testenv", - ); + ) txn.Commit() } - var _ = AfterSuite(func() { apid.Events().Close() if testServer != nil {
diff --git a/buffering_manager.go b/buffering_manager.go index b9fa3d2..6ec1ec1 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -1,22 +1,24 @@ package apidAnalytics import ( - "time" - "os" "bufio" "compress/gzip" - "path/filepath" - "fmt" "crypto/rand" "encoding/json" + "fmt" + "os" + "path/filepath" + "time" ) -const fileExtension = ".txt.gz"; +const fileExtension = ".txt.gz" // Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread var internalBuffer chan axRecords + // Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval var closeBucketEvent chan bucket + // Map from timestampt to bucket var bucketMap map[int64]bucket @@ -28,19 +30,19 @@ // This struct will store open file handle and writer to close the file type fileWriter struct { - file *os.File - gw *gzip.Writer - bw *bufio.Writer + file *os.File + gw *gzip.Writer + bw *bufio.Writer } func initBufferingManager() { internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize)) - closeBucketEvent = make(chan bucket) + closeBucketEvent = make(chan bucket) bucketMap = make(map[int64]bucket) // Keep polling the internal buffer for new messages go func() { - for { + for { records := <-internalBuffer err := save(records) if err != nil { @@ -51,8 +53,8 @@ // Keep polling the closeEvent channel to see if bucket is ready to be closed go func() { - for { - bucket := <- closeBucketEvent + for { + bucket := <-closeBucketEvent log.Debugf("Close Event received for bucket: %s", bucket.DirName) // close open file @@ -70,19 +72,18 @@ } // Save records to correct file based on what timestamp data is being collected for -func save(records axRecords) (error) { +func save(records axRecords) error { bucket, err := getBucketForTimestamp(time.Now(), records.Tenant) - if (err != nil ) { + if err != nil { return err } writeGzipFile(bucket.FileWriter, records.Records) return nil } - func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) { // first based on current timestamp and collection interval, determine the timestamp of the bucket - ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) + ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) _, exists := bucketMap[ts] if exists { return bucketMap[ts], nil @@ -90,7 +91,7 @@ timestamp := time.Unix(ts, 0).Format(timestampLayout) // endtimestamp of bucket = starttimestamp + collectionInterval - endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0) + endTime := time.Unix(ts+int64(config.GetInt(analyticsCollectionInterval)), 0) endtimestamp := endTime.Format(timestampLayout) dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp @@ -114,9 +115,9 @@ bucketMap[ts] = newBucket //Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file - timer := time.After(endTime.Sub(time.Now()) + time.Second * 5) + timer := time.After(endTime.Sub(time.Now()) + time.Second*5) go func() { - <- timer + <-timer closeBucketEvent <- newBucket }() return newBucket, nil @@ -133,7 +134,7 @@ func createGzipFile(s string) (fileWriter, error) { file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm) if err != nil { - return fileWriter{},fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err) + return fileWriter{}, fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err) } gw := gzip.NewWriter(file) bw := bufio.NewWriter(gw) @@ -160,4 +161,3 @@ fw.gw.Close() fw.file.Close() } -
diff --git a/common_helper.go b/common_helper.go index 89b37bc..d4e26d5 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -8,10 +8,13 @@ // Cache for scope uuid to org, env and tenantId information var tenantCache map[string]tenant + // RW lock for tenant map cache since the cache can be read while its being written to and vice versa var tenantCachelock = sync.RWMutex{} + // Cache for apiKey~tenantId to developer related information var developerInfoCache map[string]developerInfo + // RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa var developerInfoCacheLock = sync.RWMutex{} @@ -25,13 +28,13 @@ if error != nil { return fmt.Errorf("Count not get datascope from DB due to: %v", error) - } else { + } else { defer rows.Close() // Lock before writing to the map as it has multiple readers tenantCachelock.Lock() defer tenantCachelock.Unlock() for rows.Next() { - rows.Scan(&env, &org, &tenantId, &id); + rows.Scan(&env, &org, &tenantId, &id) tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId} } } @@ -43,7 +46,7 @@ // Load data scope information into an in-memory cache so that for each record a DB lookup is not required func createDeveloperInfoCache() error { developerInfoCache = make(map[string]developerInfo) - var apiProduct, developerApp, developerEmail, developer sql.NullString + var apiProduct, developerApp, developerEmail, developer sql.NullString var tenantId, apiKey string db := getDB() @@ -62,7 +65,7 @@ developerInfoCacheLock.Lock() defer developerInfoCacheLock.Unlock() for rows.Next() { - rows.Scan(&tenantId,&apiKey,&apiProduct, &developerApp, &developer, &developerEmail) + rows.Scan(&tenantId, &apiKey, &apiProduct, &developerApp, &developer, &developerEmail) keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) apiPrd := getValuesIgnoringNull(apiProduct) @@ -80,7 +83,7 @@ // Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config func getTenantForScope(scopeuuid string) (tenant, dbError) { - if (config.GetBool(useCaching)) { + if config.GetBool(useCaching) { _, exists := tenantCache[scopeuuid] if !exists { reason := "No tenant found for this scopeuuid: " + scopeuuid @@ -110,13 +113,13 @@ errorCode := "INTERNAL_SEARCH_ERROR" return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} } - return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{} + return tenant{Org: org, Env: env, TenantId: tenantId}, dbError{} } } // Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config func getDeveloperInfo(tenantId string, apiKey string) developerInfo { - if (config.GetBool(useCaching)) { + if config.GetBool(useCaching) { keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) _, exists := developerInfoCache[keyForMap] if !exists { @@ -131,7 +134,7 @@ return developerInfoCache[keyForMap] } } else { - var apiProduct, developerApp, developerEmail, developer sql.NullString + var apiProduct, developerApp, developerEmail, developer sql.NullString db := getDB() sSql := "SELECT ap.name, a.name, d.username, d.email " +
diff --git a/common_helper_test.go b/common_helper_test.go index 126602d..c691d52 100644 --- a/common_helper_test.go +++ b/common_helper_test.go
@@ -1,6 +1,5 @@ package apidAnalytics - import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -29,7 +28,7 @@ var _ = Describe("test getDeveloperInfo()", func() { Context("get developerInfo for valid tenantId and apikey", func() { It("should return all right data", func() { - developerInfo := getDeveloperInfo("tenantid","testapikey") + developerInfo := getDeveloperInfo("tenantid", "testapikey") Expect(developerInfo.ApiProduct).To(Equal("testproduct")) Expect(developerInfo.Developer).To(Equal("testdeveloper")) Expect(developerInfo.DeveloperEmail).To(Equal("testdeveloper@test.com")) @@ -39,11 +38,11 @@ Context("get developerInfo for invalid tenantId and apikey", func() { It("should return all right data", func() { - developerInfo := getDeveloperInfo("wrongid","wrongapikey") + developerInfo := getDeveloperInfo("wrongid", "wrongapikey") Expect(developerInfo.ApiProduct).To(Equal("")) Expect(developerInfo.Developer).To(Equal("")) Expect(developerInfo.DeveloperEmail).To(Equal("")) Expect(developerInfo.DeveloperApp).To(Equal("")) }) }) -}) \ No newline at end of file +})
diff --git a/crash_recovery.go b/crash_recovery.go index 7811d9c..b34dc9d 100644 --- a/crash_recovery.go +++ b/crash_recovery.go
@@ -1,20 +1,19 @@ package apidAnalytics import ( - "time" - "io/ioutil" - "path/filepath" "bufio" - "os" - "strings" "compress/gzip" + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" ) - const ( - crashRecoveryDelay = 30 // seconds - recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) - recoveredTS = "~recoveredTS~" // Constant to identify recovered files + crashRecoveryDelay = 30 // seconds + recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) + recoveredTS = "~recoveredTS~" // Constant to identify recovered files ) func initCrashRecovery() { @@ -22,16 +21,16 @@ timer := time.After(time.Second * crashRecoveryDelay) // Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up go func() { - <- timer + <-timer performRecovery() }() } } // Crash recovery is needed if there are any folders in tmp or recovered directory -func crashRecoveryNeeded() (bool) { +func crashRecoveryNeeded() bool { recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir() - tmpDirRecoveryNeeded := recoverFoldersInTmpDir() + tmpDirRecoveryNeeded := recoverFoldersInTmpDir() needed := tmpDirRecoveryNeeded || recoveredDirRecoveryNeeded if needed { log.Infof("Crash Recovery is needed and will be attempted in %d seconds", crashRecoveryDelay) @@ -43,14 +42,14 @@ // This partial data can be recoverd. func recoverFoldersInTmpDir() bool { tmpRecoveryNeeded := false - dirs,_ := ioutil.ReadDir(localAnalyticsTempDir) + dirs, _ := ioutil.ReadDir(localAnalyticsTempDir) recoveryTS := getRecoveryTS() for _, dir := range dirs { tmpRecoveryNeeded = true log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name()) tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name()) - newDirName := dir.Name() + recoveredTS + recoveryTS; // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 - recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName) + newDirName := dir.Name() + recoveredTS + recoveryTS // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 + recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir, newDirName) err := os.Rename(tmpCompletePath, recoveredCompletePath) if err != nil { log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name()) @@ -75,17 +74,17 @@ return false } -func performRecovery() { - log.Info("Crash recovery is starting..."); +func performRecovery() { + log.Info("Crash recovery is starting...") recoveryDirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) for _, dir := range recoveryDirs { - recoverDirectory(dir.Name()); + recoverDirectory(dir.Name()) } - log.Info("Crash recovery complete..."); + log.Info("Crash recovery complete...") } func recoverDirectory(dirName string) { - log.Infof("performing crash recovery for directory: %s", dirName); + log.Infof("performing crash recovery for directory: %s", dirName) var bucketRecoveryTS string // Parse bucket name to extract recoveryTS and pass it each file to be recovered @@ -99,7 +98,7 @@ files, _ := ioutil.ReadDir(dirBeingRecovered) for _, file := range files { // recovering each file sequentially for now - recoverFile(bucketRecoveryTS, dirName, file.Name()); + recoverFile(bucketRecoveryTS, dirName, file.Name()) } stagingPath := filepath.Join(localAnalyticsStagingDir, dirName) @@ -120,8 +119,8 @@ recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName) // Copy complete records to new file and delete original partial file - copyPartialFile(completeOrigFilePath, recoveredFilePath); - deletePartialFile(completeOrigFilePath); + copyPartialFile(completeOrigFilePath, recoveredFilePath) + deletePartialFile(completeOrigFilePath) } // The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file. @@ -172,4 +171,4 @@ if err != nil { log.Errorf("Cannot delete partial file: %s", completeOrigFilePath) } -} \ No newline at end of file +}
diff --git a/init.go b/init.go index 40949bd..e383f16 100644 --- a/init.go +++ b/init.go
@@ -4,8 +4,8 @@ "fmt" "github.com/30x/apid" "os" - "sync" "path/filepath" + "sync" ) const ( @@ -26,7 +26,7 @@ analyticsUploadIntervalDefault = "5" // Number of slots for internal channel buffering of analytics records before they are dumped to a file - analyticsBufferChannelSize = "apidanalytics_buffer_channel_size" + analyticsBufferChannelSize = "apidanalytics_buffer_channel_size" analyticsBufferChannelSizeDefault = 100 // EdgeX endpoint base path to access Uap Collection Endpoint @@ -34,16 +34,16 @@ // If caching is used then data scope and developer info will be maintained in-memory // cache to avoid DB calls for each analytics message - useCaching = "apidanalytics_use_caching" + useCaching = "apidanalytics_use_caching" useCachingDefault = true ) // keep track of the services that this plugin will use var ( - log apid.LogService - config apid.ConfigService - data apid.DataService - events apid.EventsService + log apid.LogService + config apid.ConfigService + data apid.DataService + events apid.EventsService unsafeDB apid.DB dbMux sync.RWMutex
diff --git a/listener.go b/listener.go index 846a05e..53d4bdd 100644 --- a/listener.go +++ b/listener.go
@@ -5,7 +5,7 @@ "github.com/apigee-labs/transicator/common" ) -type handler struct {} +type handler struct{} func (h *handler) String() string { return "apigeeAnalytics" @@ -35,7 +35,7 @@ } setDB(db) - if (config.GetBool(useCaching)) { + if config.GetBool(useCaching) { err = createTenantCache() if err != nil { log.Error(err) @@ -55,7 +55,7 @@ } func processChange(changes *common.ChangeList) { - if (config.GetBool(useCaching)) { + if config.GetBool(useCaching) { log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) var rows []common.Row
diff --git a/upload_manager.go b/upload_manager.go index 1cc3bc2..1ae94a5 100644 --- a/upload_manager.go +++ b/upload_manager.go
@@ -8,7 +8,7 @@ ) const ( - maxRetries = 3 + maxRetries = 3 retryFailedDirBatchSize = 10 ) @@ -96,4 +96,4 @@ break } } -} \ No newline at end of file +}
diff --git a/uploader.go b/uploader.go index d8be63c..a8292a3 100644 --- a/uploader.go +++ b/uploader.go
@@ -1,33 +1,33 @@ package apidAnalytics import ( - "os" "encoding/json" - "strings" - "path/filepath" + "fmt" "io/ioutil" "net/http" - "fmt" + "os" + "path/filepath" + "strings" "time" ) -const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss +const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss var token string var client *http.Client = &http.Client{ - Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS - } + Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS +} func addHeaders(req *http.Request) { token = config.GetString("apigeesync_bearer_token") - req.Header.Add("Authorization", "Bearer " + token) + req.Header.Add("Authorization", "Bearer "+token) } func uploadDir(dir os.FileInfo) bool { // Eg. org~env~20160101224500 tenant, timestamp := splitDirName(dir.Name()) - dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45 + dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45 completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) files, _ := ioutil.ReadDir(completePath) @@ -36,8 +36,8 @@ var error error for _, file := range files { completeFilePath := filepath.Join(completePath, file.Name()) - relativeFilePath := dateTimePartition + "/" + file.Name(); - status, error = uploadFile(tenant,relativeFilePath, completeFilePath) + relativeFilePath := dateTimePartition + "/" + file.Name() + status, error = uploadFile(tenant, relativeFilePath, completeFilePath) if error != nil { log.Errorf("Upload failed due to: %v", error) break @@ -51,7 +51,7 @@ func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) { signedUrl, err := getSignedUrl(tenant, relativeFilePath) - if (err != nil) { + if err != nil { return false, err } else { return uploadFileToDatastore(completeFilePath, signedUrl) @@ -84,13 +84,13 @@ defer resp.Body.Close() respBody, _ := ioutil.ReadAll(resp.Body) - if(resp.StatusCode == 200) { + if resp.StatusCode == 200 { var body map[string]interface{} json.Unmarshal(respBody, &body) - signedURL := body["url"] + signedURL := body["url"] return signedURL.(string), nil } else { - return "", fmt.Errorf("Error while getting signed URL '%v'",resp.Status) + return "", fmt.Errorf("Error while getting signed URL '%v'", resp.Status) } } @@ -122,27 +122,27 @@ } defer resp.Body.Close() - if(resp.StatusCode == 200) { + if resp.StatusCode == 200 { return true, nil } else { - return false,fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status) + return false, fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status) } } // Extract tenant and timestamp from directory Name -func splitDirName(dirName string) (string, string){ +func splitDirName(dirName string) (string, string) { s := strings.Split(dirName, "~") - tenant := s[0]+"~"+s[1] + tenant := s[0] + "~" + s[1] timestamp := s[2] - return tenant, timestamp + return tenant, timestamp } // files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin // eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz -func getDateFromDirTimestamp(timestamp string) (string){ +func getDateFromDirTimestamp(timestamp string) string { dateTime, _ := time.Parse(timestampLayout, timestamp) - date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd - time := dateTime.Format("15-04") // same as HH-mm - dateHourTS := "date=" + date + "/time=" + time + date := dateTime.Format("2006-01-02") // same as YYYY-MM-dd + time := dateTime.Format("15-04") // same as HH-mm + dateHourTS := "date=" + date + "/time=" + time return dateHourTS -} \ No newline at end of file +}