intial commit for quotaCache and scheduler
diff --git a/api.go b/api.go index b71bc82..75e87fa 100644 --- a/api.go +++ b/api.go
@@ -44,7 +44,6 @@ return } - fmt.Println("test1") results, err := qBucket.IncrementQuotaLimit() if err != nil {
diff --git a/constants/constants.go b/constants/constants.go index 459efb2..1d5449c 100644 --- a/constants/constants.go +++ b/constants/constants.go
@@ -1,5 +1,7 @@ package constants +import "time" + const ( //add to acceptedTimeUnitList in init() if case any other new timeUnit is added @@ -19,7 +21,8 @@ QuotaTypeCalendar = "calendar" // after start time QuotaTypeRollingWindow = "rollingwindow" // in the past "window" time - cacheKeyDelimiter = "|" + CacheKeyDelimiter = "|" + CacheTTL = time.Minute * 1 UnableToParseBody = "unable_to_parse_body" UnMarshalJSONError = "unmarshal_json_error" ErrorConvertReqBodyToEntity = "error_convert_reqBody_to_entity"
diff --git a/globalVariables/globalVariables.go b/globalVariables/globalVariables.go index 50a4df0..2891ca9 100644 --- a/globalVariables/globalVariables.go +++ b/globalVariables/globalVariables.go
@@ -1,6 +1,8 @@ package globalVariables -import "github.com/30x/apid-core" +import ( + "github.com/30x/apid-core" +) var ( Log apid.LogService
diff --git a/quotaBucket/apiUtil.go b/quotaBucket/apiUtil.go index 27d964d..707ab62 100644 --- a/quotaBucket/apiUtil.go +++ b/quotaBucket/apiUtil.go
@@ -4,6 +4,8 @@ "errors" "reflect" "time" + "github.com/30x/apidQuota/constants" + "fmt" ) const ( @@ -22,7 +24,9 @@ expiresAt int64 } -func (qBucket *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error { +func (qBucketRequest *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error { + fmt.Println("qBucketRequest: ", qBucketRequest.quotaBucketData ) + var cacheKey string var edgeOrgID, id, timeUnit, quotaType string var interval int var startTime, maxCount, weight int64 @@ -48,6 +52,7 @@ } id = value.(string) + cacheKey = edgeOrgID + constants.CacheKeyDelimiter + id value, ok = quotaBucketMap["interval"] if !ok { return errors.New(`missing field: 'interval' is required`) @@ -159,18 +164,31 @@ if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 { return errors.New(`invalid type : 'syncTimeInSec' should be a number`) } - syncTimeFloat := value.(float64) + syncTimeFloat := syncTimeValue.(float64) syncTimeInt := int64(syncTimeFloat) - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) + + if !ok { + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + fmt.Println("qbucket: ", qBucketRequest) + fmt.Println("newqbucket: ", newQBucket) + + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) + return nil } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil @@ -180,48 +198,75 @@ } syncMsgCountFloat := value.(float64) syncMsgCountInt := int64(syncMsgCountFloat) - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + if !ok { + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) + return nil + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil } } - //for synchronous quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucket.quotaBucketData = newQBucket.quotaBucketData + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + if !ok { + //for synchronous quotaBucket + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, synchronous, -1, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + addToCache(qBucketRequest) + return nil } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil } - //for non distributed quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, false, -1, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } + //retrieveFromCache. + newQBucket, ok = getFromCache(cacheKey) + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - qBucket.quotaBucketData = newQBucket.quotaBucketData + if !ok { + //for non distributed quotaBucket + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, + startTime, maxCount, weight, distributed, false, -1, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) - if err := qBucket.Validate(); err != nil { - return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("failed in Validating the quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) } return nil
diff --git a/quotaBucket/quotaBucket.go b/quotaBucket/quotaBucket.go index ca22202..5b81b1d 100644 --- a/quotaBucket/quotaBucket.go +++ b/quotaBucket/quotaBucket.go
@@ -5,6 +5,7 @@ "github.com/30x/apidQuota/constants" "strings" "time" + "fmt" ) var ( @@ -86,6 +87,8 @@ Synchronous bool SyncTimeInSec int64 SyncMessageCount int64 + QTicker *time.Ticker + } type QuotaBucket struct { @@ -113,6 +116,7 @@ Synchronous: synchronous, SyncTimeInSec: syncTimeInSec, SyncMessageCount: syncMessageCount, + QTicker: &time.Ticker{}, } quotaBucket := &QuotaBucket{ @@ -123,6 +127,24 @@ if err != nil { return nil, err } + + //for async start the scheduler + if distributed && !synchronous{ + quotaBucket.quotaBucketData.QTicker = time.NewTicker(time.Second) + go func() { + count := 0 + fmt.Println("inside create ticker") + for t := range quotaBucket.quotaBucketData.QTicker.C { + fmt.Println("Ticker ticked ", t) + if count > 10 { + quotaBucket.getTicker().Stop() + } + count += 1 + } + }() + } + + fmt.Println("quotaBucketdata: " ,quotaBucket.quotaBucketData) return quotaBucket, nil } @@ -194,6 +216,10 @@ return q.quotaBucketData.Synchronous } + +func (q *QuotaBucket) getTicker() *time.Ticker { + return q.quotaBucketData.QTicker +} //Calls setCurrentPeriod if DescriptorType is 'rollingWindow' or period.endTime is before now(). // It is required to setPeriod while incrementing the count. func (q *QuotaBucket) GetPeriod() (*QuotaPeriod, error) {
diff --git a/quotaBucket/quotaBucketType.go b/quotaBucket/quotaBucketType.go index b905bd7..b6ea239 100644 --- a/quotaBucket/quotaBucketType.go +++ b/quotaBucket/quotaBucketType.go
@@ -4,6 +4,7 @@ "errors" "fmt" "github.com/30x/apidQuota/services" + "time" ) type QuotaBucketType interface { @@ -104,16 +105,24 @@ syncTimeInSec int64 } -func (quotaBucketType AsynchronousQuotaBucketType) resetCount(qBucket *QuotaBucket) error { +func (quotaBucketType AsynchronousQuotaBucketType) resetCount(q *QuotaBucket) error { //yet to implement return nil } -func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) { +func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { //getCount() fmt.Println("increment count for async") - - return nil, nil + results := &QuotaBucketResults{ + EdgeOrgID: q.GetEdgeOrgID(), + ID: q.GetID(), + exceededTokens: true, + currentTokens: 51, + MaxCount: 50, + startedAt: time.Now().Unix(), + expiresAt: time.Now().Unix(), + } + return results, nil } func (quotaBucketType AsynchronousQuotaBucketType) resetQuotaForCurrentPeriod(q *QuotaBucket) (*QuotaBucketResults, error) {
diff --git a/quotaBucket/quotaCache.go b/quotaBucket/quotaCache.go new file mode 100644 index 0000000..e0cfe11 --- /dev/null +++ b/quotaBucket/quotaCache.go
@@ -0,0 +1,70 @@ +package quotaBucket + +import ( + "github.com/30x/apidQuota/constants" + "time" + "sync" + "fmt" +) + +var quotaCachelock = sync.RWMutex{} + +type quotaBucketCache struct { + qBucket *QuotaBucket + expiryTime int64 +} + +var quotaCache map[string]quotaBucketCache + +func init() { + quotaCache = make(map[string]quotaBucketCache) +} + +func getFromCache(cacheKey string) (*QuotaBucket,bool) { + quotaCachelock.Lock() + qBucketCache, ok := quotaCache[cacheKey] + quotaCachelock.Unlock() + if !ok { + fmt.Println("not in cache. add to cache.") + return nil,false + } + + isExpired := time.Unix(qBucketCache.expiryTime, 0).Before(time.Now().UTC()) + if isExpired { + fmt.Println("quotaBucket expired: remove from cache and return false.") + removeFromCache(cacheKey, qBucketCache) + return nil, false + } + + return qBucketCache.qBucket, true + +} + +func removeFromCache(cacheKey string, qBucketCache quotaBucketCache) { + fmt.Println("inside remove from cache") + //for async Stop the scheduler. + if qBucketCache.qBucket.Distributed && !qBucketCache.qBucket.IsSynchronous(){ + qBucketCache.qBucket.getTicker().Stop() + } + + quotaCachelock.Lock() + delete(quotaCache,cacheKey) + quotaCachelock.Unlock() +} + +func addToCache(qBucketToAdd *QuotaBucket) { + cacheKey := qBucketToAdd.GetEdgeOrgID() + constants.CacheKeyDelimiter + qBucketToAdd.GetID() + ttl := time.Now().UTC().Add(constants.CacheTTL).Unix() + + qCacheData := quotaBucketCache{ + qBucket: qBucketToAdd, + expiryTime: ttl, + } + + fmt.Println("qbucket in cache: ", qBucketToAdd.getTicker()) + quotaCachelock.Lock() + quotaCache[cacheKey] = qCacheData + quotaCachelock.Unlock() + fmt.Println("duration: " ,time.Unix(qCacheData.expiryTime,0).String()) + fmt.Println("now: ", time.Now().UTC()) +}