Added comments, improved loggin and refactored some parts
diff --git a/api.go b/api.go index a35bea5..c2edcb5 100644 --- a/api.go +++ b/api.go
@@ -18,12 +18,6 @@ Reason string `json:"reason"` } -type tenant struct { - Org string - Env string - TenantId string -} - func initAPI(services apid.Services) { log.Debug("initialized API's exposed by apidAnalytics plugin") analyticsBasePath = config.GetString(configAnalyticsBasePath) @@ -34,13 +28,7 @@ w.Header().Set("Content-Type", "application/json; charset=UTF-8") - db, _ := data.DB() // When database isnt initialized - if db == nil { - writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") - return - } - - db = getDB() // When snapshot isnt processed + db := getDB() // When database isnt initialized if db == nil { writeError(w, http.StatusInternalServerError,"INTERNAL_SERVER_ERROR","Service is not initialized completely") return
diff --git a/api.yaml b/api.yaml index 782de56..9b9ccfa 100644 --- a/api.yaml +++ b/api.yaml
@@ -106,6 +106,12 @@ properties: errrorCode: type: string + enum: + - UNKNOWN_SCOPE + - BAD_DATA + - UNSUPPORTED_CONTENT_TYPE + - UNSUPPORTED_CONTENT_ENCODING + - MISSING_FIELD reason: type: string example: { @@ -120,6 +126,9 @@ properties: errrorCode: type: string + enum: + - INTERNAL_SERVER_ERROR + - INTERNAL_SEARCH_ERROR reason: type: string example: {
diff --git a/api_helper.go b/api_helper.go index 214578d..f12605f 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -9,6 +9,10 @@ ) +/* +Implements all the helper methods needed to process the POST /analytics payload and send it to the internal buffer channel +*/ + type developerInfo struct { ApiProduct string DeveloperApp string @@ -18,14 +22,20 @@ type axRecords struct { Tenant tenant - Records []interface{} + Records []interface{} // Records is an array of multiple analytics records +} + +type tenant struct { + Org string + Env string + TenantId string } func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { var gzipEncoded bool if r.Header.Get("Content-Encoding") != "" { if !strings.EqualFold(r.Header.Get("Content-Encoding"),"gzip") { - return errResponse{"UNSUPPORTED_CONTENT_ENCODING", "Only supported content encoding is gzip"} + return errResponse{ErrorCode:"UNSUPPORTED_CONTENT_ENCODING", Reason:"Only supported content encoding is gzip"} } else { gzipEncoded = true } @@ -36,7 +46,7 @@ if gzipEncoded { reader, err = gzip.NewReader(r.Body) // reader for gzip encoded data if err != nil { - return errResponse{"BAD_DATA", "Gzip data cannot be read"} + return errResponse{ErrorCode:"BAD_DATA", Reason:"Gzip Encoded data cannot be read"} } } else { reader = r.Body @@ -51,14 +61,15 @@ func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse { var raw map[string]interface{} - dec := json.NewDecoder(reader) - dec.UseNumber() + decoder := json.NewDecoder(reader) // Decode payload to JSON data + decoder.UseNumber() - if err := dec.Decode(&raw); err != nil { - return errResponse{"BAD_DATA", "Not a valid JSON payload"} + if err := decoder.Decode(&raw); err != nil { + return errResponse{ErrorCode:"BAD_DATA", Reason:"Not a valid JSON payload"} } if records := raw["records"]; records != nil { + // Iterate through each record to validate and enrich it for _, eachRecord := range records.([]interface{}) { recordMap := eachRecord.(map[string]interface{}) valid, err := validate(recordMap) @@ -68,20 +79,25 @@ return err // Even if there is one bad record, then reject entire batch } } - // publish batch of records to channel (blocking call) axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})} + // publish batch of records to channel (blocking call) internalBuffer <- axRecords } else { - return errResponse{"NO_RECORDS", "No analytics records in the payload"} + return errResponse{ErrorCode:"NO_RECORDS", Reason:"No analytics records in the payload"} } return errResponse{} } +/* +Does basic validation on each analytics message +1. client_received_start_timestamp should exist +2. if client_received_end_timestamp exists then it should be > client_received_start_timestamp +*/ func validate(recordMap map[string]interface{}) (bool, errResponse) { elems := []string{"client_received_start_timestamp"} for _, elem := range elems { if recordMap[elem] == nil { - return false, errResponse{"MISSING_FIELD", "Missing field: " + elem} + return false, errResponse{ErrorCode:"MISSING_FIELD", Reason:"Missing Required field: " + elem} } } @@ -89,12 +105,16 @@ cret, exists2 := recordMap["client_received_end_timestamp"] if exists1 && exists2 { if crst.(json.Number) > cret.(json.Number) { - return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} + return false, errResponse{ErrorCode:"BAD_DATA", Reason:"client_received_start_timestamp > client_received_end_timestamp"} } } return true, errResponse{} } +/* +Enrich each record by adding org and env fields +It also finds add developer related information based on the apiKey +*/ func enrich(recordMap map[string]interface{}, scopeuuid string, tenant tenant) { org, orgExists := recordMap["organization"] if !orgExists || org.(string) == "" {
diff --git a/api_test.go b/api_test.go index 1c9ac2f..f56a463 100644 --- a/api_test.go +++ b/api_test.go
@@ -11,11 +11,8 @@ // BeforeSuite setup and AfterSuite cleanup is in apidAnalytics_suite_test.go var _ = Describe("testing saveAnalyticsRecord() directly", func() { - Context("valid scopeuuid", func() { - It("should successfully return", func() { - uri, err := url.Parse(testServer.URL) uri.Path = analyticsBasePath @@ -32,7 +29,6 @@ }) Context("invalid scopeuuid", func() { - It("should return bad request", func() { uri, err := url.Parse(testServer.URL) uri.Path = analyticsBasePath
diff --git a/apidAnalytics_suite_test.go b/apidAnalytics_suite_test.go index 347d694..2e98e86 100644 --- a/apidAnalytics_suite_test.go +++ b/apidAnalytics_suite_test.go
@@ -34,7 +34,8 @@ config.Set("data_path", testTempDir) config.Set(uapServerBase, "http://localhost:9000") // dummy value - config.Set(useCaching, false) + config.Set("apigeesync_apid_instance_id","abcdefgh-ijkl-mnop-qrst-uvwxyz123456") // dummy value + config.Set(useCaching, true) db, err := apid.Data().DB() Expect(err).NotTo(HaveOccurred()) @@ -44,6 +45,10 @@ insertTestData(db) apid.InitializePlugins() + // Create cache else its created in listener.go when a snapshot is received + createTenantCache() + createDeveloperInfoCache() + testServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == analyticsBasePathDefault { saveAnalyticsRecord(w, req)
diff --git a/buffering_manager.go b/buffering_manager.go index 243409d..b9fa3d2 100644 --- a/buffering_manager.go +++ b/buffering_manager.go
@@ -11,13 +11,18 @@ "encoding/json" ) +const fileExtension = ".txt.gz"; + +// Channel where analytics records are buffered before being dumped to a file as write to file should not performed in the Http Thread var internalBuffer chan axRecords +// Channel where close bucket event is published i.e. when a bucket is ready to be closed based on collection interval var closeBucketEvent chan bucket +// Map from timestampt to bucket var bucketMap map[int64]bucket type bucket struct { DirName string - // We need file handle, writter pointer to close the file + // We need file handle and writer to close the file FileWriter fileWriter } @@ -39,7 +44,7 @@ records := <-internalBuffer err := save(records) if err != nil { - log.Errorf("Could not save %d messages to file. %v", len(records.Records), err) + log.Errorf("Could not save %d messages to file due to: %v", len(records.Records), err) } } }() @@ -48,21 +53,23 @@ go func() { for { bucket := <- closeBucketEvent - log.Debugf("Closing bucket %s", bucket.DirName) + log.Debugf("Close Event received for bucket: %s", bucket.DirName) // close open file closeGzipFile(bucket.FileWriter) dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) + // close files in tmp folder and move directory to staging to indicate its ready for upload err := os.Rename(dirToBeClosed, stagingPath) if err != nil { - log.Errorf("Cannot move directory :%s to staging folder", bucket.DirName) + log.Errorf("Cannot move directory '%s' from tmp to staging folder", bucket.DirName) } } }() } +// Save records to correct file based on what timestamp data is being collected for func save(records axRecords) (error) { bucket, err := getBucketForTimestamp(time.Now(), records.Tenant) if (err != nil ) { @@ -74,7 +81,7 @@ func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) { - // first based on current timestamp, determine the timestamp bucket + // first based on current timestamp and collection interval, determine the timestamp of the bucket ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) _, exists := bucketMap[ts] if exists { @@ -82,6 +89,7 @@ } else { timestamp := time.Unix(ts, 0).Format(timestampLayout) + // endtimestamp of bucket = starttimestamp + collectionInterval endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0) endtimestamp := endTime.Format(timestampLayout) @@ -90,11 +98,12 @@ // create dir err := os.Mkdir(newPath, os.ModePerm) if err != nil { - return bucket{}, fmt.Errorf("Cannot create directory : %s to buffer messages due to %v:", dirName, err) + return bucket{}, fmt.Errorf("Cannot create directory '%s' to buffer messages '%v'", dirName, err) } // create file for writing - fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0.txt.gz" + // Format: <4DigitRandomHex>_<TSStart>.<TSEnd>_<APIDINSTANCEUUID>_writer_0.txt.gz + fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0" + fileExtension completeFilePath := filepath.Join(newPath, fileName) fw, err := createGzipFile(completeFilePath) if err != nil { @@ -104,7 +113,7 @@ newBucket := bucket{DirName: dirName, FileWriter: fw} bucketMap[ts] = newBucket - //Send event to close directory after endTime + //Send event to close directory after endTime + 5 seconds to make sure all buffers are flushed to file timer := time.After(endTime.Sub(time.Now()) + time.Second * 5) go func() { <- timer @@ -114,6 +123,7 @@ } } +// 4 digit Hex is prefixed to each filename to improve how s3 partitions the files being uploaded func getRandomHex() string { buff := make([]byte, 2) rand.Read(buff) @@ -123,7 +133,7 @@ func createGzipFile(s string) (fileWriter, error) { file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm) if err != nil { - return fileWriter{},fmt.Errorf("Cannot create file : %s to buffer messages due to: %v", s, err) + return fileWriter{},fmt.Errorf("Cannot create file '%s' to buffer messages '%v'", s, err) } gw := gzip.NewWriter(file) bw := bufio.NewWriter(gw) @@ -131,21 +141,22 @@ } func writeGzipFile(fw fileWriter, records []interface{}) { + // write each record as a new line to the bufferedWriter for _, eachRecord := range records { s, _ := json.Marshal(eachRecord) _, err := (fw.bw).WriteString(string(s)) if err != nil { - log.Errorf("Write to file failed due to: %v", err) + log.Errorf("Write to file failed '%v'", err) } (fw.bw).WriteString("\n") } + // Flush entire batch of records to file vs each message fw.bw.Flush() fw.gw.Flush() } func closeGzipFile(fw fileWriter) { fw.bw.Flush() - // Close the gzip first. fw.gw.Close() fw.file.Close() }
diff --git a/cmd/apidAnalytics/main.go b/cmd/apidAnalytics/main.go deleted file mode 100644 index 10689f1..0000000 --- a/cmd/apidAnalytics/main.go +++ /dev/null
@@ -1,29 +0,0 @@ -package main - -import ( - "github.com/30x/apid" - "github.com/30x/apid/factory" - - _ "github.com/30x/apidAnalytics" -) - -func main() { - // initialize apid using default services - apid.Initialize(factory.DefaultServicesFactory()) - - log := apid.Log() - - // this will call all initialization functions on all registered plugins - apid.InitializePlugins() - - // print the base url to the console - config := apid.Config() - basePath := config.GetString("analyticsBasePath") - port := config.GetString("api_port") - log.Printf("Analytics API is at: http://localhost:%s%s", port, basePath) - - // start client API listener - api := apid.API() - err := api.Listen() // doesn't return if no error - log.Fatalf("Error. Is something already running on port %d? %s", port, err) -}
diff --git a/common_helper.go b/common_helper.go index b0fe3a1..89b37bc 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -6,21 +6,25 @@ "sync" ) +// Cache for scope uuid to org, env and tenantId information var tenantCache map[string]tenant -var developerInfoCache map[string]developerInfo +// RW lock for tenant map cache since the cache can be read while its being written to and vice versa var tenantCachelock = sync.RWMutex{} +// Cache for apiKey~tenantId to developer related information +var developerInfoCache map[string]developerInfo +// RW lock for developerInfo map cache since the cache can be read while its being written to and vice versa var developerInfoCacheLock = sync.RWMutex{} - +// Load data scope information into an in-memory cache so that for each record a DB lookup is not required func createTenantCache() error { tenantCache = make(map[string]tenant) var org, env, tenantId, id string - db := getDB() + db := getDB() rows, error := db.Query("SELECT env, org, scope, id FROM DATA_SCOPE") if error != nil { - return fmt.Errorf("Count not get datascope from DB due to : %s", error.Error()) + return fmt.Errorf("Count not get datascope from DB due to: %v", error) } else { defer rows.Close() // Lock before writing to the map as it has multiple readers @@ -31,27 +35,27 @@ tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId} } } + log.Debugf("Count of data scopes in the cache: %d", len(tenantCache)) return nil } +// Load data scope information into an in-memory cache so that for each record a DB lookup is not required func createDeveloperInfoCache() error { developerInfoCache = make(map[string]developerInfo) - var apiProduct, developerApp, developerEmail, developer sql.NullString var tenantId, apiKey string db := getDB() - sSql := "SELECT mp.tenant_id, mp.appcred_id, ap.name, a.name, d.username, d.email " + "FROM APP_CREDENTIAL_APIPRODUCT_MAPPER as mp " + "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + "INNER JOIN APP AS a ON a.id = mp.app_id " + - "INNER JOIN DEVELOPER as d ON d.id = a.developer_id ;" + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id;" rows, error := db.Query(sSql) if error != nil { - return fmt.Errorf("Count not get developerInfo from DB due to : %s", error.Error()) + return fmt.Errorf("Count not get developerInfo from DB due to: %v", error) } else { defer rows.Close() // Lock before writing to the map as it has multiple readers @@ -69,17 +73,21 @@ developerInfoCache[keyForMap] = developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} } } + log.Debugf("Count of apiKey~tenantId combinations in the cache: %d", len(developerInfoCache)) return nil } +// Returns Tenant Info given a scope uuid from the cache or by querying the DB directly based on useCachig config func getTenantForScope(scopeuuid string) (tenant, dbError) { if (config.GetBool(useCaching)) { _, exists := tenantCache[scopeuuid] if !exists { reason := "No tenant found for this scopeuuid: " + scopeuuid errorCode := "UNKNOWN_SCOPE" - return tenant{}, dbError{errorCode, reason} + // Incase of unknown scope, try to refresh the cache ansynchronously incase an update was missed or delayed + go createTenantCache() + return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} } else { // acquire a read lock as this cache has 1 writer as well tenantCachelock.RLock() @@ -88,38 +96,33 @@ } } else { var org, env, tenantId string - db, err := data.DB() - if err != nil { - reason := "DB not initialized" - errorCode := "INTERNAL_SEARCH_ERROR" - return tenant{}, dbError{errorCode, reason} - } + db := getDB() error := db.QueryRow("SELECT env, org, scope FROM DATA_SCOPE where id = ?", scopeuuid).Scan(&env, &org, &tenantId) switch { case error == sql.ErrNoRows: reason := "No tenant found for this scopeuuid: " + scopeuuid errorCode := "UNKNOWN_SCOPE" - return tenant{}, dbError{errorCode, reason} + return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} case error != nil: reason := error.Error() errorCode := "INTERNAL_SEARCH_ERROR" - return tenant{}, dbError{errorCode, reason} + return tenant{}, dbError{ErrorCode: errorCode, Reason: reason} } - return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{} } - // TODO: localTesting - //return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{} } +// Returns Dveloper related info given an apiKey and tenantId from the cache or by querying the DB directly based on useCachig config func getDeveloperInfo(tenantId string, apiKey string) developerInfo { if (config.GetBool(useCaching)) { keyForMap := getKeyForDeveloperInfoCache(tenantId, apiKey) _, exists := developerInfoCache[keyForMap] if !exists { - log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) + log.Warnf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) + // Incase of unknown apiKey~tenantId, try to refresh the cache ansynchronously incase an update was missed or delayed + go createTenantCache() return developerInfo{} } else { // acquire a read lock as this cache has 1 writer as well @@ -137,14 +140,14 @@ "INNER JOIN APP AS a ON a.id = mp.app_id " + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " + "where mp.tenant_id = ? and mp.appcred_id = ?;" - error := db.QueryRow(sSql,tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail) + error := db.QueryRow(sSql, tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail) switch { case error == sql.ErrNoRows: - log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey) + log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) return developerInfo{} case error != nil: - log.Debug("No info found for tenantId : " + tenantId + " and apikey : " + apiKey + " due to " + error.Error()) + log.Debugf("No data found for for tenantId = %s and apiKey = %s due to: %v", tenantId, apiKey, error) return developerInfo{} } @@ -152,14 +155,11 @@ devApp := getValuesIgnoringNull(developerApp) dev := getValuesIgnoringNull(developer) devEmail := getValuesIgnoringNull(developerEmail) - return developerInfo{ApiProduct: apiPrd, DeveloperApp: devApp, DeveloperEmail: devEmail, Developer: dev} } - // TODO: localTesting - // return developerInfo{ApiProduct: "testproduct", DeveloperApp: "testapp", DeveloperEmail: "testdeveloper@test.com", Developer: "testdeveloper"} - } +// Helper method to handle scanning null values in DB to empty string func getValuesIgnoringNull(sqlValue sql.NullString) string { if sqlValue.Valid { return sqlValue.String @@ -168,6 +168,7 @@ } } +// Build Key as a combination of tenantId and apiKey for the developerInfo Cache func getKeyForDeveloperInfoCache(tenantId string, apiKey string) string { return tenantId + "~" + apiKey }
diff --git a/crash_recovery.go b/crash_recovery.go index f6793fd..7811d9c 100644 --- a/crash_recovery.go +++ b/crash_recovery.go
@@ -10,16 +10,17 @@ "compress/gzip" ) -const crashRecoveryDelay = 30 // seconds -const recovertTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format -const fileExtension = ".txt.gz"; -const recoveredTS = "~recoveredTS~" - +const ( + crashRecoveryDelay = 30 // seconds + recoveryTSLayout = "20060102150405.000" // same as "yyyyMMddHHmmss.SSS" format (Appended to Recovered folder and file) + recoveredTS = "~recoveredTS~" // Constant to identify recovered files +) func initCrashRecovery() { if crashRecoveryNeeded() { timer := time.After(time.Second * crashRecoveryDelay) + // Actual recovery of files is attempted asynchronously after a delay to not block the apid plugin from starting up go func() { <- timer performRecovery() @@ -27,6 +28,7 @@ } } +// Crash recovery is needed if there are any folders in tmp or recovered directory func crashRecoveryNeeded() (bool) { recoveredDirRecoveryNeeded := recoverFolderInRecoveredDir() tmpDirRecoveryNeeded := recoverFoldersInTmpDir() @@ -37,30 +39,34 @@ return needed } +// If Apid is shutdown or crashes while a file is still open in tmp folder, then the file has partial data. +// This partial data can be recoverd. func recoverFoldersInTmpDir() bool { tmpRecoveryNeeded := false dirs,_ := ioutil.ReadDir(localAnalyticsTempDir) recoveryTS := getRecoveryTS() for _, dir := range dirs { tmpRecoveryNeeded = true - log.Debugf("Moving directory %s from tmp to recovered ", dir.Name()) + log.Debugf("Moving directory '%s' from tmp to recovered folder", dir.Name()) tmpCompletePath := filepath.Join(localAnalyticsTempDir, dir.Name()) - - newDirName := dir.Name() + recoveredTS + recoveryTS; + newDirName := dir.Name() + recoveredTS + recoveryTS; // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 recoveredCompletePath := filepath.Join(localAnalyticsRecoveredDir,newDirName) err := os.Rename(tmpCompletePath, recoveredCompletePath) if err != nil { - log.Errorf("Cannot move directory :%s to recovered folder", dir.Name()) + log.Errorf("Cannot move directory '%s' from tmp to recovered folder", dir.Name()) } } return tmpRecoveryNeeded } +// Get Timestamp for when the recovery is being attempted on the folder. func getRecoveryTS() string { current := time.Now() - return current.Format(recovertTSLayout) + return current.Format(recoveryTSLayout) } +// If APID is restarted twice immediately such that files have been moved to recovered folder but actual recovery has'nt started or is partially done +// Then the files will just stay in the recovered dir and need to be recovered again. func recoverFolderInRecoveredDir() bool { dirs, _ := ioutil.ReadDir(localAnalyticsRecoveredDir) if len(dirs) > 0 { @@ -83,6 +89,7 @@ var bucketRecoveryTS string // Parse bucket name to extract recoveryTS and pass it each file to be recovered + // Eg. org~env~20160101222400~recoveredTS~20160101222612.123 -> bucketRecoveryTS = _20160101222612.123 index := strings.Index(dirName, recoveredTS) if index != -1 { bucketRecoveryTS = "_" + dirName[index+len(recoveredTS):] @@ -98,7 +105,7 @@ stagingPath := filepath.Join(localAnalyticsStagingDir, dirName) err := os.Rename(dirBeingRecovered, stagingPath) if err != nil { - log.Errorf("Cannot move directory :%s to staging folder", dirName) + log.Errorf("Cannot move directory '%s' from recovered to staging folder", dirName) } } @@ -106,16 +113,19 @@ log.Debugf("performing crash recovery for file: %s ", fileName) // add recovery timestamp to the file name completeOrigFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, fileName) + recoveredExtension := "_recovered" + bucketRecoveryTS + fileExtension recoveredFileName := strings.TrimSuffix(fileName, fileExtension) + recoveredExtension + // eg. 5be1_20170130155400.20170130155600_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0_recovered_20170130155452.616.txt recoveredFilePath := filepath.Join(localAnalyticsRecoveredDir, dirName, recoveredFileName) + + // Copy complete records to new file and delete original partial file copyPartialFile(completeOrigFilePath, recoveredFilePath); deletePartialFile(completeOrigFilePath); } +// The file is read line by line and all complete records are extracted and copied to a new file which is closed as a correct gzip file. func copyPartialFile(completeOrigFilePath, recoveredFilePath string) { - - // read partial file line by line using buffered gzip reader partialFile, err := os.Open(completeOrigFilePath) if err != nil { log.Errorf("Cannot open file: %s", completeOrigFilePath) @@ -152,7 +162,7 @@ bufWriter.WriteString("\n") } if err := scanner.Err(); err != nil { - log.Errorf("Error while scanning partial file: %v", err) + log.Warnf("Error while scanning partial file: %v", err) return } } @@ -160,6 +170,6 @@ func deletePartialFile(completeOrigFilePath string) { err := os.Remove(completeOrigFilePath) if err != nil { - log.Errorf("Cannot delete partial file :%s", completeOrigFilePath) + log.Errorf("Cannot delete partial file: %s", completeOrigFilePath) } } \ No newline at end of file
diff --git a/glide.yaml b/glide.yaml index 4f63803..054e3eb 100644 --- a/glide.yaml +++ b/glide.yaml
@@ -5,4 +5,4 @@ version: master testImport: - package: github.com/onsi/ginkgo/ginkgo -- package: github.com/onsi/gomega +- package: github.com/onsi/gomega \ No newline at end of file
diff --git a/init.go b/init.go index f4dbc66..40949bd 100644 --- a/init.go +++ b/init.go
@@ -9,29 +9,36 @@ ) const ( - configAnalyticsBasePath = "apidanalytics_base_path" // config + // Base path of analytics API that will be exposed + configAnalyticsBasePath = "apidanalytics_base_path" analyticsBasePathDefault = "/analytics" - configAnalyticsDataPath = "apidanalytics_data_path" // config + // Root directory for analytics local data buffering + configAnalyticsDataPath = "apidanalytics_data_path" analyticsDataPathDefault = "/ax" - analyticsCollectionInterval = "apidanalytics_collection_interval" // config in seconds + // Data collection and buffering interval in seconds + analyticsCollectionInterval = "apidanalytics_collection_interval" analyticsCollectionIntervalDefault = "120" - analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds + // Interval in seconds based on which staging directory will be checked for folders ready to be uploaded + analyticsUploadInterval = "apidanalytics_upload_interval" analyticsUploadIntervalDefault = "5" + // Number of slots for internal channel buffering of analytics records before they are dumped to a file analyticsBufferChannelSize = "apidanalytics_buffer_channel_size" - analyticsBufferChannelSizeDefault = 100 // number of slots + analyticsBufferChannelSizeDefault = 100 - uapServerBase = "apidanalytics_uap_server_base" // config + // EdgeX endpoint base path to access Uap Collection Endpoint + uapServerBase = "apidanalytics_uap_server_base" + // If caching is used then data scope and developer info will be maintained in-memory + // cache to avoid DB calls for each analytics message useCaching = "apidanalytics_use_caching" useCachingDefault = true ) // keep track of the services that this plugin will use -// note: services would also be available directly via the package global "apid" (eg. `apid.Log()`) var ( log apid.LogService config apid.ConfigService @@ -67,45 +74,44 @@ // initPlugin will be called by apid to initialize func initPlugin(services apid.Services) (apid.PluginData, error) { - // set a logger that is annotated for this plugin - log = services.Log().ForModule("apigeeAnalytics") + log = services.Log().ForModule("apidAnalytics") log.Debug("start init for apidAnalytics plugin") - // set configuration - err := setConfig(services) - if err != nil { - return pluginData, fmt.Errorf("Missing required config value: %s: ", err) - } - - // localTesting - //config.SetDefault(uapServerBase,"http://localhost:9010") - //config.SetDefault("apigeesync_apid_instance_id","fesgG-3525-SFAG") - - for _, key := range []string{uapServerBase} { - if !config.IsSet(key) { - return pluginData, fmt.Errorf("Missing required config value: %s", key) - } - - } - - directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} - err = createDirectories(directories) - - if err != nil { - return pluginData, fmt.Errorf("Cannot create required local directories %s: ", err) - } - data = services.Data() events = services.Events() events.Listen("ApigeeSync", &handler{}) + // set configuration + err := setConfig(services) + if err != nil { + return pluginData, err + } + + for _, key := range []string{uapServerBase} { + if !config.IsSet(key) { + return pluginData, fmt.Errorf("Missing required config value: %s", key) + } + } + + // Create directories for managing buffering and upload to UAP stages + directories := []string{localAnalyticsBaseDir, localAnalyticsTempDir, localAnalyticsStagingDir, localAnalyticsFailedDir, localAnalyticsRecoveredDir} + err = createDirectories(directories) + + if err != nil { + return pluginData, fmt.Errorf("Cannot create required local directories: %v ", err) + } + + // Initialize one time crash recovery to be performed by the plugin on start up initCrashRecovery() + // Initialize upload manager to watch the staging directory and upload files to UAP as they are ready initUploadManager() + // Initialize buffer manager to watch the internalBuffer channel for new messages and dump them to files initBufferingManager() + // Initialize API's and expose them initAPI(services) log.Debug("end init for apidAnalytics plugin") return pluginData, nil @@ -132,7 +138,7 @@ // set default config for collection interval config.SetDefault(analyticsCollectionInterval, analyticsCollectionIntervalDefault) - // set default config for local caching + // set default config for useCaching config.SetDefault(useCaching, useCachingDefault) // set default config for upload interval @@ -152,7 +158,7 @@ if error != nil { return error } - log.Infof("created directory for analytics data collection %s: ", path) + log.Infof("created directory for analytics data collection: %s", path) } } return nil
diff --git a/listener.go b/listener.go index 4dc37df..362dd69 100644 --- a/listener.go +++ b/listener.go
@@ -1,18 +1,17 @@ package apidAnalytics + import ( "github.com/30x/apid" "github.com/apigee-labs/transicator/common" ) -type handler struct { -} +type handler struct {} func (h *handler) String() string { - return "apidAnalytics" + return "apigeeAnalytics" } func (h *handler) Handle(e apid.Event) { - snapData, ok := e.(*common.Snapshot) if ok { processSnapshot(snapData) @@ -47,17 +46,15 @@ if err != nil { log.Error(err) } else { - log.Debug("Created a local cache for developer and app information") + log.Debug("Created a local cache for developer information") } } else { - log.Debug("Will not be caching any info and make a DB call for every analytics msg") + log.Info("Will not be caching any developer info and make a DB call for every analytics msg") } - return } func processChange(changes *common.ChangeList) { - log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) var rows []common.Row @@ -78,22 +75,24 @@ ele.Get("org", &org) ele.Get("env", &env) tenantCache[scopeuuid] = tenant{Org: org, Env: env, TenantId: tenantid} - log.Debugf("refreshed local tenantCache. Added tenant: %s", tenantid) + log.Debugf("Refreshed local tenantCache. Added scope: %s", scopeuuid) } case common.Delete: rows = append(rows, payload.NewRow) + // Lock before writing to the map as it has multiple readers tenantCachelock.Lock() defer tenantCachelock.Unlock() for _, ele := range rows { var scopeuuid string ele.Get("id", &scopeuuid) delete(tenantCache, scopeuuid) + log.Debugf("Refreshed local tenantCache. Deleted scope: %s", scopeuuid) } } case "kms.developer", "kms.app", "kms.api_product", "kms.app_credential_apiproduct_mapper": // any change in any of the above tables should result in cache refresh createDeveloperInfoCache() - log.Debug("refresh local developerInfoCache") + log.Debug("Refresh local developerInfoCache") } } }
diff --git a/upload_manager.go b/upload_manager.go index 5e1f940..1cc3bc2 100644 --- a/upload_manager.go +++ b/upload_manager.go
@@ -1,13 +1,18 @@ package apidAnalytics import ( - _ "fmt" "io/ioutil" "os" "path/filepath" "time" ) +const ( + maxRetries = 3 + retryFailedDirBatchSize = 10 +) + +// Each file upload is retried maxRetries times before moving it to failed directory var retriesMap map[string]int //TODO: make sure that this instance gets initialized only once since we dont want multiple upload manager tickers running @@ -16,6 +21,7 @@ retriesMap = make(map[string]int) go func() { + // Periodically check the staging directory to check if any folders are ready to be uploaded to S3 ticker := time.NewTicker(time.Second * config.GetDuration(analyticsUploadInterval)) log.Debugf("Intialized upload manager to check for staging directory") defer ticker.Stop() // Ticker will keep running till go routine is running i.e. till application is running @@ -24,7 +30,7 @@ files, err := ioutil.ReadDir(localAnalyticsStagingDir) if err != nil { - log.Errorf("Cannot read directory %s: ", localAnalyticsStagingDir) + log.Errorf("Cannot read directory: %s", localAnalyticsStagingDir) } uploadedDirCnt := 0 @@ -48,9 +54,10 @@ func handleUploadDirStatus(dir os.FileInfo, status bool) { completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) + // If upload is successful then delete files and remove bucket from retry map if status { os.RemoveAll(completePath) - log.Debugf("deleted directory after successful upload : %s", dir.Name()) + log.Debugf("deleted directory after successful upload: %s", dir.Name()) // remove key if exists from retry map after a successful upload delete(retriesMap, dir.Name()) } else { @@ -60,7 +67,7 @@ failedDirPath := filepath.Join(localAnalyticsFailedDir, dir.Name()) err := os.Rename(completePath, failedDirPath) if err != nil { - log.Errorf("Cannot move directory :%s to failed folder", dir.Name()) + log.Errorf("Cannot move directory '%s' from staging to failed folder", dir.Name()) } // remove key from retry map once it reaches allowed max failed attempts delete(retriesMap, dir.Name()) @@ -72,7 +79,7 @@ failedDirs, err := ioutil.ReadDir(localAnalyticsFailedDir) if err != nil { - log.Errorf("Cannot read directory %s: ", localAnalyticsFailedDir) + log.Errorf("Cannot read directory: %s", localAnalyticsFailedDir) } cnt := 0 @@ -83,7 +90,7 @@ newStagingPath := filepath.Join(localAnalyticsStagingDir, dir.Name()) err := os.Rename(failedPath, newStagingPath) if err != nil { - log.Errorf("Cannot move directory :%s to staging folder", dir.Name()) + log.Errorf("Cannot move directory '%s' from failed to staging folder", dir.Name()) } } else { break
diff --git a/uploader.go b/uploader.go index 0b84151..d8be63c 100644 --- a/uploader.go +++ b/uploader.go
@@ -11,16 +11,12 @@ "time" ) -const ( - maxRetries = 3 - retryFailedDirBatchSize = 10 - timestampLayout = "20060102150405" // same as yyyyMMddHHmmss -) +const timestampLayout = "20060102150405" // same as yyyyMMddHHmmss var token string var client *http.Client = &http.Client{ - Timeout: time.Duration(60 * time.Second), // default timeout of 60 seconds while connecting to s3/GCS + Timeout: time.Duration(60 * time.Second), //set default timeout of 60 seconds while connecting to s3/GCS } func addHeaders(req *http.Request) { @@ -29,8 +25,9 @@ } func uploadDir(dir os.FileInfo) bool { + // Eg. org~env~20160101224500 tenant, timestamp := splitDirName(dir.Name()) - dateTimePartition := getDateFromDirTimestamp(timestamp) + dateTimePartition := getDateFromDirTimestamp(timestamp) //date=2016-01-01/time=22-45 completePath := filepath.Join(localAnalyticsStagingDir, dir.Name()) files, _ := ioutil.ReadDir(completePath) @@ -42,18 +39,18 @@ relativeFilePath := dateTimePartition + "/" + file.Name(); status, error = uploadFile(tenant,relativeFilePath, completeFilePath) if error != nil { - log.Errorf("Upload failed due to : %s", error.Error()) + log.Errorf("Upload failed due to: %v", error) break } else { os.Remove(completeFilePath) - log.Debugf("Deleted file after successful upload : %s", file.Name()) + log.Debugf("Deleted file '%s' after successful upload", file.Name()) } } return status } func uploadFile(tenant, relativeFilePath, completeFilePath string) (bool, error) { - signedUrl, err := getSignedUrl(tenant, relativeFilePath, completeFilePath) + signedUrl, err := getSignedUrl(tenant, relativeFilePath) if (err != nil) { return false, err } else { @@ -61,7 +58,7 @@ } } -func getSignedUrl(tenant, relativeFilePath, completeFilePath string) (string, error) { +func getSignedUrl(tenant, relativeFilePath string) (string, error) { uapCollectionUrl := config.GetString(uapServerBase) + "/analytics" req, err := http.NewRequest("GET", uapCollectionUrl, nil) @@ -71,15 +68,14 @@ q := req.URL.Query() - // localTesting - q.Add("repo", "edge") - q.Add("dataset", "api") - + // eg. edgexfeb1~test q.Add("tenant", tenant) + // eg. date=2017-01-30/time=16-32/1069_20170130163200.20170130163400_218e3d99-efaf-4a7b-b3f2-5e4b00c023b7_writer_0.txt.gz q.Add("relative_file_path", relativeFilePath) q.Add("file_content_type", "application/x-gzip") req.URL.RawQuery = q.Encode() + // Add Bearer Token to each request addHeaders(req) resp, err := client.Do(req) if err != nil { @@ -94,12 +90,12 @@ signedURL := body["url"] return signedURL.(string), nil } else { - return "", fmt.Errorf("Error while getting signed URL: %s",resp.Status) + return "", fmt.Errorf("Error while getting signed URL '%v'",resp.Status) } } func uploadFileToDatastore(completeFilePath, signedUrl string) (bool, error) { - // read gzip file that needs to be uploaded + // open gzip file that needs to be uploaded file, err := os.Open(completeFilePath) if err != nil { return false, err @@ -108,7 +104,7 @@ req, err := http.NewRequest("PUT", signedUrl, file) if err != nil { - return false, fmt.Errorf("Parsing URL failed due to %v", err) + return false, fmt.Errorf("Parsing URL failed '%v'", err) } req.Header.Set("Expect", "100-continue") @@ -116,7 +112,7 @@ fileStats, err := file.Stat() if err != nil { - return false, fmt.Errorf("Could not get content length for file: %v", err) + return false, fmt.Errorf("Could not get content length for file '%v'", err) } req.ContentLength = fileStats.Size() @@ -129,10 +125,11 @@ if(resp.StatusCode == 200) { return true, nil } else { - return false,fmt.Errorf("Final Datastore (S3/GCS) returned Error: %v ", resp.Status) + return false,fmt.Errorf("Final Datastore (S3/GCS)returned Error '%v'", resp.Status) } } +// Extract tenant and timestamp from directory Name func splitDirName(dirName string) (string, string){ s := strings.Split(dirName, "~") tenant := s[0]+"~"+s[1] @@ -140,7 +137,7 @@ return tenant, timestamp } -// files are uploaded to S3 under specific partition and that key needs to be generated from the plugin +// files are uploaded to S3 under specific date time partition and that key needs to be generated from the plugin // eg. <...prefix generated by uap collection service...>/date=2016-01-02/time=15-45/filename.txt.gz func getDateFromDirTimestamp(timestamp string) (string){ dateTime, _ := time.Parse(timestampLayout, timestamp)