| // Copyright 2017 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package quotaBucket |
| |
| import ( |
| "errors" |
| "github.com/30x/apidQuota/constants" |
| "github.com/30x/apidQuota/globalVariables" |
| "github.com/30x/apidQuota/services" |
| "strings" |
| "sync/atomic" |
| "time" |
| ) |
| |
| var ( |
| acceptedTimeUnitList map[string]bool |
| acceptedTypeList map[string]bool |
| ) |
| |
| func init() { |
| |
| acceptedTimeUnitList = map[string]bool{constants.TimeUnitSECOND: true, |
| constants.TimeUnitMINUTE: true, constants.TimeUnitHOUR: true, |
| constants.TimeUnitDAY: true, constants.TimeUnitWEEK: true, constants.TimeUnitMONTH: true} |
| acceptedTypeList = map[string]bool{constants.QuotaTypeCalendar: true, |
| constants.QuotaTypeRollingWindow: true} |
| |
| } |
| |
| type quotaPeriod struct { |
| inputStartTime time.Time |
| startTime time.Time |
| endTime time.Time |
| } |
| |
| func (qp *quotaPeriod) GetPeriodInputStartTime() time.Time { |
| |
| return qp.inputStartTime |
| } |
| |
| func (qp *quotaPeriod) GetPeriodStartTime() time.Time { |
| |
| return qp.startTime |
| } |
| |
| func (qp *quotaPeriod) GetPeriodEndTime() time.Time { |
| |
| return qp.endTime |
| } |
| |
| func (qp *quotaPeriod) Validate() (bool, error) { |
| |
| if qp.startTime.Before(qp.endTime) { |
| return true, nil |
| } |
| return false, errors.New(constants.InvalidQuotaPeriod + " : startTime in the period must be before endTime") |
| |
| } |
| |
| type aSyncQuotaBucket struct { |
| syncTimeInSec int64 // sync time in seconds. |
| syncMessageCount int64 //set to -1 if the aSyncQuotaBucket should syncTimeInSec |
| asyncLocalMessageCount int64 |
| asyncCounter *[]int64 |
| asyncGLobalCount int64 |
| initialized bool |
| qTicker *time.Ticker |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncSyncTime() (int64, error) { |
| |
| if qAsync != nil { |
| return qAsync.syncTimeInSec, nil |
| } |
| return 0, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncSyncMessageCount() (int64, error) { |
| |
| if qAsync != nil { |
| return qAsync.syncMessageCount, nil |
| } |
| return 0, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncLocalMessageCount() (int64, error) { |
| |
| if qAsync != nil { |
| return qAsync.asyncLocalMessageCount, nil |
| } |
| return 0, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) addToAsyncLocalMessageCount(count int64) error { |
| |
| if qAsync != nil { |
| atomic.AddInt64(&qAsync.asyncLocalMessageCount, count) |
| } |
| return errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncGlobalCount() (int64, error) { |
| |
| if qAsync != nil { |
| return qAsync.asyncGLobalCount, nil |
| } |
| return 0, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncIsInitialized() (bool, error) { |
| |
| if qAsync != nil { |
| return qAsync.initialized, nil |
| } |
| return false, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncQTicker() (*time.Ticker, error) { |
| |
| if qAsync != nil { |
| return qAsync.qTicker, nil |
| } |
| return nil, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (qAsync *aSyncQuotaBucket) getAsyncCounter() (*[]int64, error) { |
| |
| if qAsync != nil { |
| return qAsync.asyncCounter, nil |
| } |
| return nil, errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| func (aSyncbucket *aSyncQuotaBucket) addToCounter(weight int64) error { |
| |
| if aSyncbucket == nil { |
| return errors.New(constants.AsyncQuotaBucketEmpty) |
| } |
| |
| *aSyncbucket.asyncCounter = append(*aSyncbucket.asyncCounter, weight) |
| return nil |
| } |
| |
| func (aSyncbucket *aSyncQuotaBucket) getCount(q *QuotaBucket, period *quotaPeriod) (int64, error) { |
| |
| var gcount int64 |
| var err error |
| if !aSyncbucket.initialized { |
| gcount, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), 0, period.startTime.Unix(), period.endTime.Unix()) |
| if err != nil { |
| return 0, err |
| } |
| aSyncbucket.asyncGLobalCount = gcount |
| aSyncbucket.initialized = true |
| } |
| |
| return aSyncbucket.asyncGLobalCount + aSyncbucket.asyncLocalMessageCount, nil |
| } |
| |
| type quotaBucketData struct { |
| EdgeOrgID string |
| ID string |
| Interval int |
| TimeUnit string //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH} |
| QuotaType string //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW} |
| PreciseAtSecondsLevel bool |
| StartTime time.Time |
| MaxCount int64 |
| Weight int64 |
| Distributed bool |
| Synchronous bool |
| AsyncQuotaDetails *aSyncQuotaBucket |
| } |
| |
| type QuotaBucket struct { |
| quotaBucketData |
| } |
| |
| func NewQuotaBucket(edgeOrgID string, id string, interval int, |
| timeUnit string, quotaType string, preciseAtSecondsLevel bool, |
| startTime int64, maxCount int64, weight int64, distributed bool, |
| synchronous bool, syncTimeInSec int64, syncMessageCount int64) (*QuotaBucket, error) { |
| |
| fromUNIXTime := time.Unix(startTime, 0) |
| quotaBucketDataStruct := quotaBucketData{ |
| EdgeOrgID: edgeOrgID, |
| ID: id, |
| Interval: interval, |
| TimeUnit: timeUnit, |
| QuotaType: quotaType, |
| PreciseAtSecondsLevel: preciseAtSecondsLevel, |
| StartTime: fromUNIXTime, |
| MaxCount: maxCount, |
| Weight: weight, |
| Distributed: distributed, |
| Synchronous: synchronous, |
| AsyncQuotaDetails: nil, |
| } |
| |
| quotaBucket := &QuotaBucket{ |
| quotaBucketData: quotaBucketDataStruct, |
| } |
| |
| if !quotaBucket.IsDistrubuted() { |
| if quotaBucket.IsSynchronous(){ |
| return nil, errors.New("quota bucket cannot be both nonDistributed and synchronous.") |
| } |
| } |
| //for async set AsyncQuotaDetails and start the NewTicker |
| if distributed && !synchronous { |
| var quotaTicker int64 |
| //ensure just one of syncTimeInSec and syncMessageCount is set. |
| if syncTimeInSec > -1 && syncMessageCount > -1 { |
| return nil,errors.New("both syncTimeInSec and syncMessageCount canot be set. only one of them should be set.") |
| } |
| //set default syncTime for AsyncQuotaBucket. |
| //for aSyncQuotaBucket with 'syncMessageCount' the ticker is invoked with DefaultQuotaSyncTime |
| quotaTicker = constants.DefaultQuotaSyncTime |
| |
| if syncTimeInSec > 0 { //if sync with counter service periodically |
| quotaTicker = syncTimeInSec |
| } |
| |
| counter := make([]int64, 0) |
| newAsyncQuotaDetails := &aSyncQuotaBucket{ |
| syncTimeInSec: syncTimeInSec, |
| syncMessageCount: syncMessageCount, |
| asyncCounter: &counter, |
| asyncGLobalCount: constants.DefaultCount, |
| asyncLocalMessageCount: constants.DefaultCount, |
| initialized: false, |
| qTicker: time.NewTicker(time.Duration(time.Second.Nanoseconds() * quotaTicker)), |
| } |
| |
| quotaBucket.setAsyncQuotaBucket(newAsyncQuotaDetails) |
| go func() { |
| aSyncBucket := quotaBucket.GetAsyncQuotaBucket() |
| if aSyncBucket != nil { |
| exitCount := int64(0) |
| qticker, _ := aSyncBucket.getAsyncQTicker() |
| for t := range qticker.C { |
| globalVariables.Log.Debug("t: ", t.String()) |
| if len(*aSyncBucket.asyncCounter) == 0 { |
| exitCount += 1 |
| } |
| period, err := quotaBucket.GetPeriod() |
| if err != nil { |
| globalVariables.Log.Error("error getting period for: ", err.Error(), "for quotaBucket: ", quotaBucket) |
| qticker.Stop() |
| continue |
| } |
| //sync with counterService. |
| err = internalRefresh(quotaBucket, period) |
| if err != nil { |
| globalVariables.Log.Error("error during internalRefresh: ", err.Error(), "for quotaBucket: ", quotaBucket) |
| qticker.Stop() |
| continue |
| } |
| |
| if exitCount > 3 { |
| removeFromCache( quotaBucket.GetEdgeOrgID()+ |
| constants.CacheKeyDelimiter + quotaBucket.GetID(), |
| quotaCache[quotaBucket.GetEdgeOrgID()+constants.CacheKeyDelimiter + quotaBucket.GetID()]) |
| qticker.Stop() |
| } |
| } |
| } else { |
| globalVariables.Log.Error("aSyncBucketDetails are empty for the given quotaBucket: ", quotaBucket) |
| } |
| }() |
| } |
| |
| return quotaBucket, nil |
| |
| } |
| |
| func (q *QuotaBucket) Validate() error { |
| |
| //check valid quotaTimeUnit |
| if ok := IsValidTimeUnit(strings.ToLower(q.GetTimeUnit())); !ok { |
| return errors.New(constants.InvalidQuotaTimeUnitType) |
| } |
| |
| if ok := IsValidType(strings.ToLower(q.GetType())); !ok { |
| return errors.New(constants.InvalidQuotaType) |
| } |
| |
| //check if the period is valid |
| period, err := q.GetPeriod() |
| if err != nil { |
| return errors.New("error retireving Period for the quota Bucket" + err.Error()) |
| } |
| |
| if ok, err := period.Validate(); !ok { |
| return errors.New("invalid Period: " + err.Error()) |
| } |
| |
| return nil |
| } |
| |
| func (q *QuotaBucket) GetEdgeOrgID() string { |
| return q.quotaBucketData.EdgeOrgID |
| } |
| |
| func (q *QuotaBucket) GetID() string { |
| return q.quotaBucketData.ID |
| } |
| |
| func (q *QuotaBucket) GetInterval() int { |
| return q.quotaBucketData.Interval |
| } |
| |
| func (q *QuotaBucket) GetTimeUnit() string { |
| return q.quotaBucketData.TimeUnit |
| } |
| |
| func (q *QuotaBucket) GetStartTime() time.Time { |
| return q.quotaBucketData.StartTime |
| } |
| |
| //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW} |
| func (q *QuotaBucket) GetType() string { |
| return q.quotaBucketData.QuotaType |
| } |
| |
| func (q *QuotaBucket) GetIsPreciseAtSecondsLevel() bool { |
| return q.quotaBucketData.PreciseAtSecondsLevel |
| } |
| |
| func (q *QuotaBucket) GetMaxCount() int64 { |
| return q.quotaBucketData.MaxCount |
| } |
| |
| func (q *QuotaBucket) GetWeight() int64 { |
| return q.quotaBucketData.Weight |
| } |
| |
| func (q *QuotaBucket) IsDistrubuted() bool { |
| return q.quotaBucketData.Distributed |
| } |
| |
| func (q *QuotaBucket) IsSynchronous() bool { |
| return q.quotaBucketData.Synchronous |
| } |
| |
| //setCurrentPeriod only for rolling window else just return the value of QuotaPeriod. |
| func (q *QuotaBucket) GetPeriod() (*quotaPeriod, error) { |
| |
| qDescriptorType, err := GetQuotaTypeHandler(q.GetType()) |
| if err != nil { |
| return nil, err |
| } |
| return qDescriptorType.GetCurrentPeriod(q) |
| |
| } |
| |
| func (period *quotaPeriod) IsCurrentPeriod(qBucket *QuotaBucket) bool { |
| if qBucket != nil && qBucket.GetType() != "" { |
| if qBucket.GetType() == constants.QuotaTypeRollingWindow { |
| return (period.inputStartTime.Equal(time.Now().UTC()) || period.inputStartTime.Before(time.Now().UTC())) |
| } |
| |
| return (period.inputStartTime.Equal(time.Now().UTC()) || period.inputStartTime.Before(time.Now().UTC())) && |
| period.startTime.String() != "" && |
| period.endTime.String() != "" && |
| period.startTime.Before(period.endTime) && |
| (period.startTime.Equal(time.Now().UTC()) || period.startTime.Before(time.Now().UTC())) && |
| (period.endTime.Equal(time.Now().UTC()) || period.endTime.After(time.Now().UTC())) |
| } |
| return false |
| } |
| |
| func (q *QuotaBucket) setAsyncQuotaBucket(aSyncbucket *aSyncQuotaBucket) { |
| q.quotaBucketData.AsyncQuotaDetails = aSyncbucket |
| } |
| |
| func (q *QuotaBucket) GetAsyncQuotaBucket() *aSyncQuotaBucket { |
| return q.quotaBucketData.AsyncQuotaDetails |
| } |
| |
| func (q *QuotaBucket) IncrementQuotaLimit() (*QuotaBucketResults, error) { |
| |
| qBucketHandler, err := GetQuotaBucketHandler(q) |
| if err != nil { |
| return nil, errors.New("error getting quotaBucketHandler: " + err.Error()) |
| } |
| |
| return qBucketHandler.incrementQuotaCount(q) |
| |
| } |
| |
| func IsValidTimeUnit(timeUnit string) bool { |
| if _, ok := acceptedTimeUnitList[timeUnit]; ok { |
| return true |
| } |
| return false |
| } |
| |
| func IsValidType(qtype string) bool { |
| if _, ok := acceptedTypeList[qtype]; ok { |
| return true |
| } |
| return false |
| } |