[XAPID-377] Added logic to buffer messages to local files and then move to staging
diff --git a/api.yaml b/api.yaml index 333bbc2..782de56 100644 --- a/api.yaml +++ b/api.yaml
@@ -73,7 +73,7 @@ "client_received_end_timestamp": 1462850097580, "client_sent_start_timestamp": 1462850097894, "request_path" : "/oauth/oauthv2/auth_code/", - "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB", + "request_uri": "/oauth/oauthv2/auth_code/?response_type=code&redirect_url=http%3A%2F%2Fexample.com&client_id=A1h6yYAVeADnEKji8M37zCSn6olcmQDB", "useragent" : "Chrome", "target" : "target_name", "target_received_end_timestamp": 1462850097800,
diff --git a/api_helper.go b/api_helper.go index e89d1d9..214578d 100644 --- a/api_helper.go +++ b/api_helper.go
@@ -4,7 +4,6 @@ "encoding/json" "net/http" "io" - "io/ioutil" "strings" "compress/gzip" ) @@ -17,6 +16,11 @@ Developer string } +type axRecords struct { + Tenant tenant + Records []interface{} +} + func processPayload(tenant tenant, scopeuuid string, r *http.Request) errResponse { var gzipEncoded bool if r.Header.Get("Content-Encoding") != "" { @@ -38,33 +42,35 @@ reader = r.Body } - body, _ := ioutil.ReadAll(reader) - errMessage := validateEnrichPublish(tenant, scopeuuid, body) + errMessage := validateEnrichPublish(tenant, scopeuuid, reader) if errMessage.ErrorCode != "" { return errMessage } return errResponse{} } -func validateEnrichPublish(tenant tenant, scopeuuid string, body []byte) errResponse { +func validateEnrichPublish(tenant tenant, scopeuuid string, reader io.Reader) errResponse { var raw map[string]interface{} - err := json.Unmarshal(body, &raw) - if err != nil { + dec := json.NewDecoder(reader) + dec.UseNumber() + + if err := dec.Decode(&raw); err != nil { return errResponse{"BAD_DATA", "Not a valid JSON payload"} } + if records := raw["records"]; records != nil { for _, eachRecord := range records.([]interface{}) { recordMap := eachRecord.(map[string]interface{}) valid, err := validate(recordMap) if valid { enrich(recordMap, scopeuuid, tenant) - // TODO: Remove log - log.Debugf("Raw records : %v ", recordMap) } else { return err // Even if there is one bad record, then reject entire batch } } - publishToChannel(records.([]interface{})) + // publish batch of records to channel (blocking call) + axRecords := axRecords{Tenant: tenant, Records: records.([]interface{})} + internalBuffer <- axRecords } else { return errResponse{"NO_RECORDS", "No analytics records in the payload"} } @@ -82,7 +88,7 @@ crst, exists1 := recordMap["client_received_start_timestamp"] cret, exists2 := recordMap["client_received_end_timestamp"] if exists1 && exists2 { - if crst.(int64) > cret.(int64) { + if crst.(json.Number) > cret.(json.Number) { return false, errResponse{"BAD_DATA", "client_received_start_timestamp > client_received_end_timestamp"} } } @@ -105,7 +111,6 @@ if exists { apiKey := apiKey.(string) devInfo := getDeveloperInfo(tenant.TenantId, apiKey) - // TODO: Remove log _, exists := recordMap["api_product"] if !exists { recordMap["api_product"] = devInfo.ApiProduct @@ -125,12 +130,6 @@ } } -func publishToChannel(records []interface{}) { - // TODO: add the batch of records to a channel for consumption - log.Debugf("records on channel : %v", records) - return -} - func writeError(w http.ResponseWriter, status int, code string, reason string) { w.WriteHeader(status) e := errResponse{
diff --git a/buffering_manager.go b/buffering_manager.go new file mode 100644 index 0000000..3acb4c8 --- /dev/null +++ b/buffering_manager.go
@@ -0,0 +1,153 @@ +package apidAnalytics + +import ( + "time" + "os" + "bufio" + "compress/gzip" + "path/filepath" + "fmt" + "crypto/rand" + "encoding/base64" + "encoding/json" +) + +var internalBuffer chan axRecords +var closeBucketEvent chan bucket +var bucketMap map[int64]bucket + +type bucket struct { + DirName string + // We need file handle, writter pointer to close the file + FileWriter fileWriter +} + +// This struct will store open file handle and writer to close the file +type fileWriter struct { + file *os.File + gw *gzip.Writer + bw *bufio.Writer +} + +func initBufferingManager() { + internalBuffer = make(chan axRecords, config.GetInt(analyticsBufferChannelSize)) + closeBucketEvent = make(chan bucket) + bucketMap = make(map[int64]bucket) + + // Keep polling the internal buffer for new messages + go func() { + for { + records := <-internalBuffer + err := save(records) + if err != nil { + log.Errorf("Could not save %d messages to file. %v", len(records.Records), err) + } + } + }() + + // Keep polling the closeEvent channel to see if bucket is ready to be closed + go func() { + for { + bucket := <- closeBucketEvent + log.Debugf("Closing bucket %s", bucket.DirName) + + // close open file + closeGzipFile(bucket.FileWriter) + + dirToBeClosed := filepath.Join(localAnalyticsTempDir, bucket.DirName) + stagingPath := filepath.Join(localAnalyticsStagingDir, bucket.DirName) + err := os.Rename(dirToBeClosed, stagingPath) + if err != nil { + log.Errorf("Cannot move directory :%s to staging folder", bucket.DirName) + } + } + }() +} + +func save(records axRecords) (error) { + bucket, err := getBucketForTimestamp(time.Now(), records.Tenant) + if (err != nil ) { + return err + } + writeGzipFile(bucket.FileWriter, records.Records) + return nil +} + + +func getBucketForTimestamp(now time.Time, tenant tenant) (bucket, error) { + // first based on current timestamp, determine the timestamp bucket + ts := now.Unix() / int64(config.GetInt(analyticsCollectionInterval)) * int64(config.GetInt(analyticsCollectionInterval)) + _, exists := bucketMap[ts] + if exists { + return bucketMap[ts], nil + } else { + timestamp := time.Unix(ts, 0).Format(timestampLayout) + + endTime := time.Unix(ts + int64(config.GetInt(analyticsCollectionInterval)), 0) + endtimestamp := endTime.Format(timestampLayout) + + dirName := tenant.Org + "~" + tenant.Env + "~" + timestamp + newPath := filepath.Join(localAnalyticsTempDir, dirName) + // create dir + err := os.Mkdir(newPath, os.ModePerm) + if err != nil { + return bucket{}, fmt.Errorf("Cannot create directory : %s to buffer messages due to %v:", dirName, err) + } + + // create file for writing + fileName := getRandomHex() + "_" + timestamp + "." + endtimestamp + "_" + config.GetString("apigeesync_apid_instance_id") + "_writer_0.txt.gz" + completeFilePath := filepath.Join(newPath, fileName) + fw, err := createGzipFile(completeFilePath) + if err != nil { + return bucket{}, err + } + + newBucket := bucket{DirName: dirName, FileWriter: fw} + bucketMap[ts] = newBucket + + //Send event to close directory after endTime + timer := time.After(endTime.Sub(time.Now()) + time.Second * 5) + go func() { + <- timer + closeBucketEvent <- newBucket + }() + return newBucket, nil + } +} + +//TODO: implement 4 digit hext method +func getRandomHex() string { + buff := make([]byte, 2) + rand.Read(buff) + return base64.URLEncoding.EncodeToString(buff) +} + +func createGzipFile(s string) (fileWriter, error) { + file, err := os.OpenFile(s, os.O_WRONLY|os.O_CREATE, os.ModePerm) + if err != nil { + return fileWriter{},fmt.Errorf("Cannot create file : %s to buffer messages due to: %v", s, err) + } + gw := gzip.NewWriter(file) + bw := bufio.NewWriter(gw) + return fileWriter{file, gw, bw}, nil +} + +func writeGzipFile(fw fileWriter, records []interface{}) { + for _, eachRecord := range records { + s, _ := json.Marshal(eachRecord) + _, err := (fw.bw).WriteString(string(s)) + if err != nil { + log.Errorf("Write to file failed due to: %v", err) + } + (fw.bw).WriteString("\n") + } + fw.bw.Flush() +} + +func closeGzipFile(fw fileWriter) { + fw.bw.Flush() + // Close the gzip first. + fw.gw.Close() + fw.file.Close() +} +
diff --git a/common_helper.go b/common_helper.go index e64252a..85754ba 100644 --- a/common_helper.go +++ b/common_helper.go
@@ -3,10 +3,14 @@ import ( "database/sql" "fmt" + "sync" ) var tenantCache map[string]tenant var developerInfoCache map[string]developerInfo +var tenantCachelock = sync.RWMutex{} +var developerInfoCacheLock = sync.RWMutex{} + func createTenantCache() error { tenantCache = make(map[string]tenant) @@ -22,12 +26,15 @@ return fmt.Errorf("Count not get datascope from DB due to : %s", error.Error()) } else { defer rows.Close() + // Lock before writing to the map as it has multiple readers + tenantCachelock.Lock() + defer tenantCachelock.Unlock() for rows.Next() { rows.Scan(&env, &org, &tenantId, &id); tenantCache[id] = tenant{Org: org, Env: env, TenantId: tenantId} } } - log.Debugf("Count of datadscopes in the cache: %d", len(tenantCache)) + log.Debugf("Count of data scopes in the cache: %d", len(tenantCache)) return nil } @@ -50,6 +57,9 @@ return fmt.Errorf("Count not get developerInfo from DB due to : %s", error.Error()) } else { defer rows.Close() + // Lock before writing to the map as it has multiple readers + developerInfoCacheLock.Lock() + defer developerInfoCacheLock.Unlock() for rows.Next() { rows.Scan(&tenantId,&apiKey,&apiProduct, &developerApp, &developer, &developerEmail) @@ -74,6 +84,9 @@ errorCode := "UNKNOWN_SCOPE" return tenant{}, dbError{errorCode, reason} } else { + // acquire a read lock as this cache has 1 writer as well + tenantCache.RLock() + defer tenantCache.RUnlock() return tenantCache[scopeuuid], dbError{} } } else { @@ -100,7 +113,7 @@ return tenant{Org: org, Env:env, TenantId: tenantId}, dbError{} } - // TODO: local testing + //// TODO: local testing //return tenant{Org: "testorg", Env:"testenv", TenantId: "tenantid"}, dbError{} } @@ -112,6 +125,9 @@ log.Debugf("No data found for for tenantId = %s and apiKey = %s", tenantId, apiKey) return developerInfo{} } else { + // acquire a read lock as this cache has 1 writer as well + developerInfoCacheLock.RLock() + defer developerInfoCacheLock.RUnlock() return developerInfoCache[keyForMap] } } else { @@ -123,8 +139,8 @@ "INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id " + "INNER JOIN APP AS a ON a.id = mp.app_id " + "INNER JOIN DEVELOPER as d ON d.id = a.developer_id " + - "where mp.tenant_id = \"" + tenantId + "\" and mp.appcred_id = \"" + apiKey + "\";" - error := db.QueryRow(sSql).Scan(&apiProduct, &developerApp, &developer, &developerEmail) + "where mp.tenant_id = ? and mp.appcred_id = ?;" + error := db.QueryRow(sSql,tenantId, apiKey).Scan(&apiProduct, &developerApp, &developer, &developerEmail) switch { case error == sql.ErrNoRows:
diff --git a/crashRecovery.go b/crash_recovery.go similarity index 98% rename from crashRecovery.go rename to crash_recovery.go index 33a0711..971270d 100644 --- a/crashRecovery.go +++ b/crash_recovery.go
@@ -19,9 +19,9 @@ func initCrashRecovery() { if crashRecoveryNeeded() { - timer := time.NewTimer(time.Second * crashRecoveryDelay) + timer := time.After(time.Second * crashRecoveryDelay) go func() { - <- timer.C + <- timer performRecovery() }() }
diff --git a/init.go b/init.go index b8eaf81..f0161f3 100644 --- a/init.go +++ b/init.go
@@ -25,6 +25,9 @@ analyticsUploadInterval = "apidanalytics_upload_interval" // config in seconds analyticsUploadIntervalDefault = "5" + analyticsBufferChannelSize = "apidanalytics_buffer_channel_size" + analyticsBufferChannelSizeDefault = 10 // number of slots + uapServerBase = "apidanalytics_uap_server_base" // config useCaching = "apidanalytics_use_caching" @@ -80,7 +83,8 @@ } // localTesting - config.SetDefault(uapServerBase,"http://localhost:9010") + //config.SetDefault(uapServerBase,"http://localhost:9010") + //config.SetDefault("apigeesync_apid_instance_id","fesgG-3525-SFAG") for _, key := range []string{uapServerBase} { if !config.IsSet(key) { @@ -104,6 +108,8 @@ initUploadManager() + initBufferingManager() + initAPI(services) log.Debug("end init for apidAnalytics plugin") return pluginData, nil @@ -137,6 +143,9 @@ // set default config for upload interval config.SetDefault(analyticsUploadInterval, analyticsUploadIntervalDefault) + // set default config for internal buffer size + config.SetDefault(analyticsBufferChannelSize, analyticsBufferChannelSizeDefault) + return nil }
diff --git a/listener.go b/listener.go index d7a8509..b20fb9b 100644 --- a/listener.go +++ b/listener.go
@@ -68,6 +68,9 @@ switch payload.Operation { case common.Insert, common.Update: rows = append(rows, payload.NewRow) + // Lock before writing to the map as it has multiple readers + tenantCachelock.Lock() + defer tenantCachelock.Unlock() for _, ele := range rows { var scopeuuid, tenantid, org, env string ele.Get("id", &scopeuuid) @@ -78,6 +81,8 @@ } case common.Delete: rows = append(rows, payload.NewRow) + tenantCachelock.Lock() + defer tenantCachelock.Unlock() for _, ele := range rows { var scopeuuid string ele.Get("id", &scopeuuid)
diff --git a/uploadManager.go b/upload_manager.go similarity index 100% rename from uploadManager.go rename to upload_manager.go