Merge branch 'master' of github.com:30x/apidQuota
diff --git a/api.go b/api.go index b71bc82..b1fa4be 100644 --- a/api.go +++ b/api.go
@@ -2,7 +2,6 @@ import ( "encoding/json" - "fmt" "github.com/30x/apid-core" "github.com/30x/apidQuota/constants" "github.com/30x/apidQuota/globalVariables" @@ -44,7 +43,6 @@ return } - fmt.Println("test1") results, err := qBucket.IncrementQuotaLimit() if err != nil { @@ -52,7 +50,6 @@ return } - fmt.Println("test2 : ", results) respMap := results.ToAPIResponse() respbytes, err := json.Marshal(respMap)
diff --git a/constants/constants.go b/constants/constants.go index 459efb2..1d5449c 100644 --- a/constants/constants.go +++ b/constants/constants.go
@@ -1,5 +1,7 @@ package constants +import "time" + const ( //add to acceptedTimeUnitList in init() if case any other new timeUnit is added @@ -19,7 +21,8 @@ QuotaTypeCalendar = "calendar" // after start time QuotaTypeRollingWindow = "rollingwindow" // in the past "window" time - cacheKeyDelimiter = "|" + CacheKeyDelimiter = "|" + CacheTTL = time.Minute * 1 UnableToParseBody = "unable_to_parse_body" UnMarshalJSONError = "unmarshal_json_error" ErrorConvertReqBodyToEntity = "error_convert_reqBody_to_entity"
diff --git a/globalVariables/globalVariables.go b/globalVariables/globalVariables.go index 50a4df0..2891ca9 100644 --- a/globalVariables/globalVariables.go +++ b/globalVariables/globalVariables.go
@@ -1,6 +1,8 @@ package globalVariables -import "github.com/30x/apid-core" +import ( + "github.com/30x/apid-core" +) var ( Log apid.LogService
diff --git a/quotaBucket/apiUtil.go b/quotaBucket/apiUtil.go index 27d964d..b739cad 100644 --- a/quotaBucket/apiUtil.go +++ b/quotaBucket/apiUtil.go
@@ -4,6 +4,7 @@ "errors" "reflect" "time" + "github.com/30x/apidQuota/constants" ) const ( @@ -22,7 +23,8 @@ expiresAt int64 } -func (qBucket *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error { +func (qBucketRequest *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error { + var cacheKey string var edgeOrgID, id, timeUnit, quotaType string var interval int var startTime, maxCount, weight int64 @@ -48,6 +50,7 @@ } id = value.(string) + cacheKey = edgeOrgID + constants.CacheKeyDelimiter + id value, ok = quotaBucketMap["interval"] if !ok { return errors.New(`missing field: 'interval' is required`) @@ -159,18 +162,29 @@ if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 { return errors.New(`invalid type : 'syncTimeInSec' should be a number`) } - syncTimeFloat := value.(float64) + syncTimeFloat := syncTimeValue.(float64) syncTimeInt := int64(syncTimeFloat) - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) + + if !ok { + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) + return nil } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil @@ -180,48 +194,75 @@ } syncMsgCountFloat := value.(float64) syncMsgCountInt := int64(syncMsgCountFloat) - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + if !ok { + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) + return nil + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil } } - //for synchronous quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + if !ok { + //for synchronous quotaBucket + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, -1, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + addToCache(qBucketRequest) + return nil } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil } - //for non distributed quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, false, -1, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } + //retrieveFromCache. + newQBucket, ok = getFromCache(cacheKey) + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - qBucket.quotaBucketData = newQBucket.quotaBucketData + if !ok { + //for non distributed quotaBucket + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, false, -1, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) } return nil
diff --git a/quotaBucket/quotaBucket.go b/quotaBucket/quotaBucket.go index ca22202..0a06b21 100644 --- a/quotaBucket/quotaBucket.go +++ b/quotaBucket/quotaBucket.go
@@ -3,6 +3,7 @@ import ( "errors" "github.com/30x/apidQuota/constants" + "github.com/30x/apidQuota/globalVariables" "strings" "time" ) @@ -86,6 +87,9 @@ Synchronous bool SyncTimeInSec int64 SyncMessageCount int64 + AsyncMessageCounter int64 + QTicker *time.Ticker + } type QuotaBucket struct { @@ -113,6 +117,8 @@ Synchronous: synchronous, SyncTimeInSec: syncTimeInSec, SyncMessageCount: syncMessageCount, + AsyncMessageCounter: int64(-1), + QTicker: &time.Ticker{}, } quotaBucket := &QuotaBucket{ @@ -123,6 +129,23 @@ if err != nil { return nil, err } + + //for async SetAsyncMessageCounter to 0 and also start the scheduler + if distributed && !synchronous{ + quotaBucket.SetAsyncMessageCounter(0) + quotaBucket.quotaBucketData.QTicker = time.NewTicker(time.Second) + go func() { + count := 0 + for t := range quotaBucket.quotaBucketData.QTicker.C { + globalVariables.Log.Debug("t: : ", t.String()) + if count > 10 { + quotaBucket.getTicker().Stop() + } + count += 1 + } + }() + } + return quotaBucket, nil } @@ -194,6 +217,13 @@ return q.quotaBucketData.Synchronous } +func (qbucket *QuotaBucket) SetAsyncMessageCounter(count int64) { + qbucket.quotaBucketData.AsyncMessageCounter = count +} + +func (q *QuotaBucket) getTicker() *time.Ticker { + return q.quotaBucketData.QTicker +} //Calls setCurrentPeriod if DescriptorType is 'rollingWindow' or period.endTime is before now(). // It is required to setPeriod while incrementing the count. func (q *QuotaBucket) GetPeriod() (*QuotaPeriod, error) {
diff --git a/quotaBucket/quotaBucketType.go b/quotaBucket/quotaBucketType.go index b905bd7..3c73d46 100644 --- a/quotaBucket/quotaBucketType.go +++ b/quotaBucket/quotaBucketType.go
@@ -2,7 +2,6 @@ import ( "errors" - "fmt" "github.com/30x/apidQuota/services" ) @@ -43,7 +42,6 @@ func (sQuotaBucket SynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { - fmt.Println("increment count for sync") maxCount := q.GetMaxCount() exceededCount := false allowedCount := int64(0) @@ -104,15 +102,13 @@ syncTimeInSec int64 } -func (quotaBucketType AsynchronousQuotaBucketType) resetCount(qBucket *QuotaBucket) error { +func (quotaBucketType AsynchronousQuotaBucketType) resetCount(q *QuotaBucket) error { //yet to implement return nil } -func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) { +func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { //getCount() - fmt.Println("increment count for async") - return nil, nil } @@ -127,7 +123,6 @@ return nil } func (sQuotaBucket NonDistributedQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) { - fmt.Println("increment count for nondistributed.") return nil, nil }
diff --git a/quotaBucket/quotaBucket_test.go b/quotaBucket/quotaBucket_test.go index 3a8fcfb..08b580c 100644 --- a/quotaBucket/quotaBucket_test.go +++ b/quotaBucket/quotaBucket_test.go
@@ -407,23 +407,6 @@ Fail("Exprected true, returned: false") } - //end Time in period is now // cant set end time to now and tes.. by the time it evaluates isCurrentPeriod the period.endTime will be before time.now() - //fmt.Println("entTIme is now : ") - //startTime = time.Now().Unix() - //period = NewQuotaPeriod(time.Now().AddDate(0,-1,-1).Unix(), - // time.Now().AddDate(0,0,-1).Unix(), - // time.Now().Unix()) - //quotaBucket = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, period, startTime, maxCount, bucketType) - //err = quotaBucket.Validate() - //if err != nil { - // Fail("no error expected but got error: " + err.Error()) - //} - // - //period.IsCurrentPeriod(quotaBucket) - //if ok := period.IsCurrentPeriod(quotaBucket); !ok{ - // Fail("Exprected true, returned: false") - //} - //end Time in period is after now startTime = time.Now().UTC().Unix() quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
diff --git a/quotaBucket/quotaCache.go b/quotaBucket/quotaCache.go new file mode 100644 index 0000000..4c8338e --- /dev/null +++ b/quotaBucket/quotaCache.go
@@ -0,0 +1,68 @@ +package quotaBucket + +import ( + "github.com/30x/apidQuota/constants" + "time" + "sync" +) + +var quotaCachelock = sync.RWMutex{} + +type quotaBucketCache struct { + qBucket *QuotaBucket + expiryTime int64 +} + +var quotaCache map[string]quotaBucketCache + +func init() { + quotaCache = make(map[string]quotaBucketCache) +} + +func getFromCache(cacheKey string) (*QuotaBucket,bool) { + quotaCachelock.Lock() + defer quotaCachelock.Unlock() + qBucketCache, ok := quotaCache[cacheKey] + if !ok { + return nil,false + } + + isExpired := time.Unix(qBucketCache.expiryTime, 0).Before(time.Now().UTC()) + if isExpired { + removeFromCache(cacheKey, qBucketCache) + return nil, false + } + + // update expiry time every time you access. + ttl := time.Now().UTC().Add(constants.CacheTTL).Unix() + qBucketCache.expiryTime = ttl + quotaCache[cacheKey] = qBucketCache + + return qBucketCache.qBucket, true + +} + +func removeFromCache(cacheKey string, qBucketCache quotaBucketCache) { + //for async Stop the scheduler. + if qBucketCache.qBucket.Distributed && !qBucketCache.qBucket.IsSynchronous(){ + qBucketCache.qBucket.getTicker().Stop() + } + + quotaCachelock.Lock() + delete(quotaCache,cacheKey) + quotaCachelock.Unlock() +} + +func addToCache(qBucketToAdd *QuotaBucket) { + cacheKey := qBucketToAdd.GetEdgeOrgID() + constants.CacheKeyDelimiter + qBucketToAdd.GetID() + ttl := time.Now().UTC().Add(constants.CacheTTL).Unix() + + qCacheData := quotaBucketCache{ + qBucket: qBucketToAdd, + expiryTime: ttl, + } + + quotaCachelock.Lock() + quotaCache[cacheKey] = qCacheData + quotaCachelock.Unlock() +}
diff --git a/services/counterServiceHelper.go b/services/counterServiceHelper.go index a2c95aa..21fea87 100644 --- a/services/counterServiceHelper.go +++ b/services/counterServiceHelper.go
@@ -4,7 +4,6 @@ "bytes" "encoding/json" "errors" - "fmt" "github.com/30x/apidQuota/constants" "github.com/30x/apidQuota/globalVariables" "io/ioutil" @@ -32,7 +31,6 @@ } func IncrementAndGetCount(orgID string, quotaKey string, count int64, startTimeInt int64, endTimeInt int64) (int64, error) { - fmt.Println("calling counter service") headers := http.Header{} headers.Set("Accept", "application/json") headers.Set("Content-Type", "application/json") @@ -56,9 +54,6 @@ reqBody[startTime] = startTimeInt * int64(1000) reqBody[endTime] = endTimeInt * int64(1000) - fmt.Println("startTime: ", startTimeInt) - fmt.Println("endTime: ", endTimeInt) - reqBodyBytes, err := json.Marshal(reqBody) if err != nil { return 0, errors.New(constants.MarshalJSONError) @@ -73,7 +68,6 @@ ContentLength: int64(contentLength), } - fmt.Println("req: ", request) resp, err := client.Do(request) if err != nil { @@ -104,7 +98,6 @@ if !ok { return 0, errors.New(`invalid response from counter service. field 'count' not sent in the response`) } - fmt.Println("respcount: ", respCount) globalVariables.Log.Debug("responseCount: ", respCount)