support only async, distributed quota and preciseAtSecondsLevel always true
diff --git a/api_test.go b/api_test.go index 2fe6bc0..07b14f2 100644 --- a/api_test.go +++ b/api_test.go
@@ -1,10 +1,10 @@ package apidQuota_test import ( - . "github.com/onsi/ginkgo" "bytes" "encoding/json" "github.com/google/uuid" + . "github.com/onsi/ginkgo" "io/ioutil" "net/http" "time"
diff --git a/constants/constants.go b/constants/constants.go index 2b640ac..6c2f131 100644 --- a/constants/constants.go +++ b/constants/constants.go
@@ -4,7 +4,7 @@ const ( //config variables. - ApigeeSyncBearerToken = "apigeesync_bearer_token" + ApigeeSyncBearerToken = "apigeesync_bearer_token" ConfigCounterServiceBasePath = "apidquota_counterService_base_path" //add to acceptedTimeUnitList in init() if case any other new timeUnit is added @@ -19,7 +19,7 @@ InvalidQuotaTimeUnitType = "invalidQuotaTimeUnitType" InvalidQuotaType = "invalidQuotaType" InvalidQuotaPeriod = "invalidQuotaPeriod" - AsyncQuotaBucketEmpty = "AsyncDetails_for_quotaBucket_are_empty" + AsyncQuotaBucketEmpty = "AsyncDetails_for_quotaBucket_are_empty" QuotaTypeCalendar = "calendar" // after start time QuotaTypeRollingWindow = "rollingwindow" // in the past "window" time @@ -36,7 +36,7 @@ ErrorCheckingQuotaLimit = "error_checking_quota_limit" QuotaBasePathDefault = "/quota" - URLCounterServiceNotSet = "url_counter_service_not_set" - URLCounterServiceInvalid = "url_counter_service_invalid" - MarshalJSONError = "marshal_JSON_error" + URLCounterServiceNotSet = "url_counter_service_not_set" + URLCounterServiceInvalid = "url_counter_service_invalid" + MarshalJSONError = "marshal_JSON_error" )
diff --git a/quotaBucket/apiUtil.go b/quotaBucket/apiUtil.go index 26b958d..7b33ddc 100644 --- a/quotaBucket/apiUtil.go +++ b/quotaBucket/apiUtil.go
@@ -8,9 +8,26 @@ ) const ( + //request response common params reqEdgeOrgID = "edgeOrgID" reqID = "id" reqMaxCount = "maxCount" + + //request specific params + reqInterval = "interval" + reqTimeUnit = "timeUnit" + reqQType = "type" + reqStartTimestamp = "startTimestamp" + reqSyncTimeInSec = "syncTimeInSec" + reqSyncMessageCount = "syncMessageCount" + reqWeight = "weight" + + //response specific params + respExceeded = "exceeded" + respRemainingCount = "remainingCount" + respExpiresTimestamp = "expiresTimestamp" + respStartTimestamp = "startTimestamp" + ) type QuotaBucketResults struct { @@ -28,78 +45,68 @@ var edgeOrgID, id, timeUnit, quotaType string var interval int var startTime, maxCount, weight int64 - var preciseAtSecondsLevel, distributed bool newQBucket := &QuotaBucket{} var err error value, ok := quotaBucketMap[reqEdgeOrgID] if !ok { - return errors.New(`missing field: 'edgeOrgID' is required`) + return errors.New("missing field: "+ reqEdgeOrgID + " is required") } if edgeOrgIDType := reflect.TypeOf(value); edgeOrgIDType.Kind() != reflect.String { - return errors.New(`invalid type : 'edgeOrgID' should be a string`) + return errors.New("invalid type : "+ reqEdgeOrgID + " should be a string") } edgeOrgID = value.(string) value, ok = quotaBucketMap[reqID] if !ok { - return errors.New(`missing field: 'id' is required`) + return errors.New("missing field: "+ reqID + " is required") } if idType := reflect.TypeOf(value); idType.Kind() != reflect.String { - return errors.New(`invalid type : 'id' should be a string`) + return errors.New("invalid type : "+ reqID + " should be a string") } id = value.(string) //build cacheKey - to retrieve from or add to quotaCache cacheKey = edgeOrgID + constants.CacheKeyDelimiter + id - value, ok = quotaBucketMap["interval"] + value, ok = quotaBucketMap[reqInterval] if !ok { - return errors.New(`missing field: 'interval' is required`) + return errors.New("missing field: "+ reqInterval + " is required") } - //from input when its read its float, need to then convert to int. + //from input its read as float, hence need to then convert to int. if intervalType := reflect.TypeOf(value); intervalType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'interval' should be a number`) + return errors.New("invalid type : "+ reqInterval + " should be a number") } intervalFloat := value.(float64) interval = int(intervalFloat) //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH} - value, ok = quotaBucketMap["timeUnit"] + value, ok = quotaBucketMap[reqTimeUnit] if !ok { - return errors.New(`missing field: 'timeUnit' is required`) + return errors.New("missing field: "+ reqTimeUnit + " is required") } if timeUnitType := reflect.TypeOf(value); timeUnitType.Kind() != reflect.String { - return errors.New(`invalid type : 'timeUnit' should be a string`) + return errors.New("invalid type : "+ reqTimeUnit + " should be a string") } timeUnit = value.(string) //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW} - value, ok = quotaBucketMap["type"] + value, ok = quotaBucketMap[reqQType] if !ok { - return errors.New(`missing field: 'type' is required`) + return errors.New("missing field: "+ reqQType + " is required") } if quotaTypeType := reflect.TypeOf(value); quotaTypeType.Kind() != reflect.String { - return errors.New(`invalid type : 'type' should be a string`) + return errors.New("invalid type : "+ reqQType + " should be a string") } quotaType = value.(string) - value, ok = quotaBucketMap["preciseAtSecondsLevel"] - if !ok { - return errors.New(`missing field: 'preciseAtSecondsLevel' is required`) - } - if preciseAtSecondsLevelType := reflect.TypeOf(value); preciseAtSecondsLevelType.Kind() != reflect.Bool { - return errors.New(`invalid type : 'preciseAtSecondsLevel' should be boolean`) - } - preciseAtSecondsLevel = value.(bool) - - value, ok = quotaBucketMap["startTimestamp"] + value, ok = quotaBucketMap[reqStartTimestamp] if !ok { //todo: in the current cps code startTime is optional for QuotaBucket. should we make startTime optional to NewQuotaBucket? startTime = time.Now().UTC().Unix() } else { // //from input when its read its float, need to then convert to int. if startTimeType := reflect.TypeOf(value); startTimeType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'startTime' should be UNIX timestamp`) + return errors.New("invalid type : "+ reqStartTimestamp + " should be UNIX timestamp") } startTimeFloat := value.(float64) startTime = int64(startTimeFloat) @@ -107,127 +114,79 @@ value, ok = quotaBucketMap[reqMaxCount] if !ok { - return errors.New(`missing field: 'maxCount' is required`) + return errors.New("missing field: "+ reqMaxCount + " is required") } //from input when its read its float, need to then convert to int. if maxCountType := reflect.TypeOf(value); maxCountType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'maxCount' should be a number`) + return errors.New("invalid type : "+ reqMaxCount + " should be a number") } maxCountFloat := value.(float64) maxCount = int64(maxCountFloat) - value, ok = quotaBucketMap["weight"] + value, ok = quotaBucketMap[reqWeight] if !ok { - return errors.New(`missing field: 'weight' is required`) + return errors.New("missing field: "+ reqWeight + " is required") } //from input when its read its float, need to then convert to int. if weightType := reflect.TypeOf(value); weightType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'maxCount' should be a number`) + return errors.New("invalid type : "+ reqWeight + " should be a number") } weightFloat := value.(float64) weight = int64(weightFloat) - value, ok = quotaBucketMap["distributed"] - if !ok { - return errors.New(`missing field: 'distributed' is required`) + syncTimeValue, syncTimeOK := quotaBucketMap[reqSyncTimeInSec] + syncMsgCountValue, syncMsgCountOK := quotaBucketMap[reqSyncMessageCount] + + if syncTimeOK && syncMsgCountOK { + return errors.New("either "+ reqSyncTimeInSec + " or "+ reqSyncMessageCount + " should be present but not both.") } - if preciseAtSecondsLevelType := reflect.TypeOf(value); preciseAtSecondsLevelType.Kind() != reflect.Bool { - return errors.New(`invalid type : 'distributed' should be boolean`) + + if !syncTimeOK && !syncMsgCountOK { + return errors.New("either "+ reqSyncTimeInSec + " or "+ reqSyncMessageCount + " should be present. both cant be empty.") } - distributed = value.(bool) - //if distributed check for sync or async Quota - if distributed { - value, ok = quotaBucketMap["synchronous"] - if !ok { - return errors.New(`missing field: 'synchronous' is required`) + if syncTimeOK { + if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 { + return errors.New("invalid type : "+ reqSyncTimeInSec + " should be a number") } - if synchronousType := reflect.TypeOf(value); synchronousType.Kind() != reflect.Bool { - return errors.New(`invalid type : 'synchronous' should be boolean`) - } - synchronous := value.(bool) - - // for async retrieve syncTimeSec or syncMessageCount - if !synchronous { - syncTimeValue, syncTimeOK := quotaBucketMap["syncTimeInSec"] - syncMsgCountValue, syncMsgCountOK := quotaBucketMap["syncMessageCount"] - - if syncTimeOK && syncMsgCountOK { - return errors.New(`either syncTimeInSec or syncMessageCount should be present but not both.`) - } - - if !syncTimeOK && !syncMsgCountOK { - return errors.New(`either syncTimeInSec or syncMessageCount should be present. both cant be empty.`) - } - - if syncTimeOK { - if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'syncTimeInSec' should be a number`) - } - syncTimeFloat := syncTimeValue.(float64) - syncTimeInt := int64(syncTimeFloat) - - //try to retrieve from cache - newQBucket, ok = getFromCache(cacheKey, weight) - - if !ok { - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - - if err := qBucketRequest.Validate(); err != nil { - return errors.New("error validating quotaBucket: " + err.Error()) - } - - addToCache(qBucketRequest) - return nil - } - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - - return nil - - } else if syncMsgCountOK { - if syncMsgCountType := reflect.TypeOf(syncMsgCountValue); syncMsgCountType.Kind() != reflect.Float64 { - return errors.New(`invalid type : 'syncTimeInSec' should be a number`) - } - syncMsgCountFloat := syncMsgCountValue.(float64) - syncMsgCountInt := int64(syncMsgCountFloat) - //try to retrieve from cache - newQBucket, ok = getFromCache(cacheKey, weight) - - if !ok { - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt) - if err != nil { - return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - - if err := qBucketRequest.Validate(); err != nil { - return errors.New("error validating quotaBucket: " + err.Error()) - } - - addToCache(qBucketRequest) - return nil - - } - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData - - return nil - } - } + syncTimeFloat := syncTimeValue.(float64) + syncTimeInt := int64(syncTimeFloat) //try to retrieve from cache newQBucket, ok = getFromCache(cacheKey, weight) if !ok { - //for synchronous quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, synchronous, -1, -1) + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, + startTime, maxCount, weight, syncTimeInt, -1) + if err != nil { + return errors.New("error creating quotaBucket: " + err.Error()) + } + + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + if err := qBucketRequest.Validate(); err != nil { + return errors.New("error validating quotaBucket: " + err.Error()) + } + + addToCache(qBucketRequest) + return nil + } + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + + return nil + + } else if syncMsgCountOK { + if syncMsgCountType := reflect.TypeOf(syncMsgCountValue); syncMsgCountType.Kind() != reflect.Float64 { + return errors.New("invalid type : "+ reqSyncMessageCount + " should be a number") + } + syncMsgCountFloat := syncMsgCountValue.(float64) + syncMsgCountInt := int64(syncMsgCountFloat) + //try to retrieve from cache + newQBucket, ok = getFromCache(cacheKey, weight) + + if !ok { + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, + startTime, maxCount, weight, -1, syncMsgCountInt) if err != nil { return errors.New("error creating quotaBucket: " + err.Error()) } @@ -236,37 +195,36 @@ if err := qBucketRequest.Validate(); err != nil { return errors.New("error validating quotaBucket: " + err.Error()) } + addToCache(qBucketRequest) return nil - } + } qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + return nil } - //for non distributed quotaBucket - //retrieveFromCache. + //try to retrieve from cache newQBucket, ok = getFromCache(cacheKey, weight) if !ok { - //for non distributed quotaBucket - newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel, - startTime, maxCount, weight, distributed, false, -1, -1) + //for synchronous quotaBucket + newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, + startTime, maxCount, weight, -1, -1) if err != nil { return errors.New("error creating quotaBucket: " + err.Error()) - } - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData if err := qBucketRequest.Validate(); err != nil { return errors.New("error validating quotaBucket: " + err.Error()) } - addToCache(qBucketRequest) + return nil } - qBucketRequest.quotaBucketData = newQBucket.quotaBucketData + qBucketRequest.quotaBucketData = newQBucket.quotaBucketData return nil } @@ -276,10 +234,10 @@ resultsMap[reqEdgeOrgID] = qBucketResults.EdgeOrgID resultsMap[reqID] = qBucketResults.ID resultsMap[reqMaxCount] = qBucketResults.MaxCount - resultsMap["exceeded"] = qBucketResults.exceeded - resultsMap["remainingCount"] = qBucketResults.remainingCount - resultsMap["startTimestamp"] = qBucketResults.startTimestamp - resultsMap["expiresTimestamp"] = qBucketResults.expiresTimestamp + resultsMap[respExceeded] = qBucketResults.exceeded + resultsMap[respRemainingCount] = qBucketResults.remainingCount + resultsMap[respStartTimestamp] = qBucketResults.startTimestamp + resultsMap[respExpiresTimestamp] = qBucketResults.expiresTimestamp return resultsMap }
diff --git a/quotaBucket/quotaBucket.go b/quotaBucket/quotaBucket.go index 081211e..334f3c0 100644 --- a/quotaBucket/quotaBucket.go +++ b/quotaBucket/quotaBucket.go
@@ -155,19 +155,133 @@ return aSyncbucket.asyncGLobalCount + aSyncbucket.asyncLocalMessageCount, nil } +func (quotaBucketType *aSyncQuotaBucket) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { + period, err := q.GetPeriod() + if err != nil { + return nil, errors.New("error getting period: " + err.Error()) + } + aSyncBucket := q.GetAsyncQuotaBucket() + if aSyncBucket == nil { + return nil, errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.") + } + currentCount, err := aSyncBucket.getCount(q, period) + if err != nil { + return nil, err + } + + maxCount := q.GetMaxCount() + exceeded := false + remainingCount := int64(0) + weight := q.GetWeight() + + if period.IsCurrentPeriod(q) { + + if currentCount < maxCount { + diffCount := (currentCount + weight) - maxCount + if diffCount > 0 { + exceeded = true + remainingCount = maxCount - currentCount + + } else { + aSyncBucket.addToCounter(weight) + aSyncBucket.addToAsyncLocalMessageCount(weight) + remainingCount = maxCount - (currentCount + weight) + + } + + asyncMessageCount, err := aSyncBucket.getAsyncSyncMessageCount() + if err != nil { + return nil, err + } + + asyncLocalMsgCount, err := aSyncBucket.getAsyncLocalMessageCount() + if err != nil { + return nil, err + } + + if asyncMessageCount > 0 && + asyncLocalMsgCount >= asyncMessageCount { + err = internalRefresh(q, period) + if err != nil { + return nil, err + } + } + } else { + exceeded = true + remainingCount = maxCount - currentCount + } + } + if remainingCount < 0 { + remainingCount = int64(0) + } + + results := &QuotaBucketResults{ + EdgeOrgID: q.GetEdgeOrgID(), + ID: q.GetID(), + exceeded: exceeded, + remainingCount: remainingCount, + MaxCount: maxCount, + startTimestamp: period.GetPeriodStartTime().Unix(), + expiresTimestamp: period.GetPeriodEndTime().Unix(), + } + + return results, nil +} + +func internalRefresh(q *QuotaBucket, period *quotaPeriod) error { + var err error + aSyncBucket := q.GetAsyncQuotaBucket() + if aSyncBucket == nil { + return errors.New(constants.AsyncQuotaBucketEmpty) + } + + weight := int64(0) + countFromCounterService := int64(0) + globalCount, err := aSyncBucket.getAsyncGlobalCount() + if err != nil { + return err + } + + maxCount := q.GetMaxCount() + for _, counterEle := range *aSyncBucket.asyncCounter { + weight += counterEle + + //delete from asyncCounter + temp := *aSyncBucket.asyncCounter + temp = temp[1:] + aSyncBucket.asyncCounter = &temp + + if (weight + globalCount) >= maxCount { + //clear asyncCounter + for range *aSyncBucket.asyncCounter { + //delete all elements from asyncCounter + temp := *aSyncBucket.asyncCounter + temp = temp[1:] + aSyncBucket.asyncCounter = &temp + } + } + } + + countFromCounterService, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix()) + if err != nil { + return err + } + aSyncBucket.asyncGLobalCount = countFromCounterService + + aSyncBucket.asyncLocalMessageCount -= weight + return nil +} + type quotaBucketData struct { - EdgeOrgID string - ID string - Interval int - TimeUnit string //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH} - QuotaType string //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW} - PreciseAtSecondsLevel bool - StartTime time.Time - MaxCount int64 - Weight int64 - Distributed bool - Synchronous bool - AsyncQuotaDetails *aSyncQuotaBucket + EdgeOrgID string + ID string + Interval int + TimeUnit string //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH} + QuotaType string //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW} + StartTime time.Time + MaxCount int64 + Weight int64 + AsyncQuotaDetails *aSyncQuotaBucket } type QuotaBucket struct { @@ -175,98 +289,86 @@ } func NewQuotaBucket(edgeOrgID string, id string, interval int, - timeUnit string, quotaType string, preciseAtSecondsLevel bool, - startTime int64, maxCount int64, weight int64, distributed bool, - synchronous bool, syncTimeInSec int64, syncMessageCount int64) (*QuotaBucket, error) { + timeUnit string, quotaType string, + startTime int64, maxCount int64, weight int64, syncTimeInSec int64, syncMessageCount int64) (*QuotaBucket, error) { fromUNIXTime := time.Unix(startTime, 0) quotaBucketDataStruct := quotaBucketData{ - EdgeOrgID: edgeOrgID, - ID: id, - Interval: interval, - TimeUnit: timeUnit, - QuotaType: quotaType, - PreciseAtSecondsLevel: preciseAtSecondsLevel, - StartTime: fromUNIXTime, - MaxCount: maxCount, - Weight: weight, - Distributed: distributed, - Synchronous: synchronous, - AsyncQuotaDetails: nil, + EdgeOrgID: edgeOrgID, + ID: id, + Interval: interval, + TimeUnit: timeUnit, + QuotaType: quotaType, + StartTime: fromUNIXTime, + MaxCount: maxCount, + Weight: weight, + AsyncQuotaDetails: nil, } quotaBucket := &QuotaBucket{ quotaBucketData: quotaBucketDataStruct, } - if !quotaBucket.IsDistrubuted() { - if quotaBucket.IsSynchronous(){ - return nil, errors.New("quota bucket cannot be both nonDistributed and synchronous.") - } + var quotaTicker int64 + //ensure just one of syncTimeInSec and syncMessageCount is set. + if syncTimeInSec > -1 && syncMessageCount > -1 { + return nil, errors.New("both syncTimeInSec and syncMessageCount canot be set. only one of them should be set.") } - //for async set AsyncQuotaDetails and start the NewTicker - if distributed && !synchronous { - var quotaTicker int64 - //ensure just one of syncTimeInSec and syncMessageCount is set. - if syncTimeInSec > -1 && syncMessageCount > -1 { - return nil,errors.New("both syncTimeInSec and syncMessageCount canot be set. only one of them should be set.") - } - //set default syncTime for AsyncQuotaBucket. - //for aSyncQuotaBucket with 'syncMessageCount' the ticker is invoked with DefaultQuotaSyncTime - quotaTicker = constants.DefaultQuotaSyncTime + //set default syncTime for AsyncQuotaBucket. + //for aSyncQuotaBucket with 'syncMessageCount' the ticker is invoked with DefaultQuotaSyncTime + quotaTicker = constants.DefaultQuotaSyncTime - if syncTimeInSec > 0 { //if sync with counter service periodically - quotaTicker = syncTimeInSec - } + if syncTimeInSec > 0 { //if sync with counter service periodically + quotaTicker = syncTimeInSec + } - counter := make([]int64, 0) - newAsyncQuotaDetails := &aSyncQuotaBucket{ - syncTimeInSec: syncTimeInSec, - syncMessageCount: syncMessageCount, - asyncCounter: &counter, - asyncGLobalCount: constants.DefaultCount, - asyncLocalMessageCount: constants.DefaultCount, - initialized: false, - qTicker: time.NewTicker(time.Duration(time.Second.Nanoseconds() * quotaTicker)), - } + counter := make([]int64, 0) + newAsyncQuotaDetails := &aSyncQuotaBucket{ + syncTimeInSec: syncTimeInSec, + syncMessageCount: syncMessageCount, + asyncCounter: &counter, + asyncGLobalCount: constants.DefaultCount, + asyncLocalMessageCount: constants.DefaultCount, + initialized: false, + qTicker: time.NewTicker(time.Duration(time.Second.Nanoseconds() * quotaTicker)), + } - quotaBucket.setAsyncQuotaBucket(newAsyncQuotaDetails) - go func() { - aSyncBucket := quotaBucket.GetAsyncQuotaBucket() - if aSyncBucket != nil { - exitCount := int64(0) - qticker, _ := aSyncBucket.getAsyncQTicker() - for t := range qticker.C { - globalVariables.Log.Debug("t: ", t.String()) - if len(*aSyncBucket.asyncCounter) == 0 { - exitCount += 1 - } - period, err := quotaBucket.GetPeriod() - if err != nil { - globalVariables.Log.Error("error getting period for: ", err.Error(), "for quotaBucket: ", quotaBucket) - qticker.Stop() - continue - } - //sync with counterService. - err = internalRefresh(quotaBucket, period) - if err != nil { - globalVariables.Log.Error("error during internalRefresh: ", err.Error(), "for quotaBucket: ", quotaBucket) - qticker.Stop() - continue - } - - if exitCount > 3 { - removeFromCache( quotaBucket.GetEdgeOrgID()+ - constants.CacheKeyDelimiter + quotaBucket.GetID(), - quotaCache[quotaBucket.GetEdgeOrgID()+constants.CacheKeyDelimiter + quotaBucket.GetID()]) - qticker.Stop() - } + quotaBucket.setAsyncQuotaBucket(newAsyncQuotaDetails) + go func() { + aSyncBucket := quotaBucket.GetAsyncQuotaBucket() + if aSyncBucket != nil { + exitCount := int64(0) + qticker, _ := aSyncBucket.getAsyncQTicker() + for t := range qticker.C { + globalVariables.Log.Debug("t: ", t.String()) + if len(*aSyncBucket.asyncCounter) == 0 { + exitCount += 1 } - } else { - globalVariables.Log.Error("aSyncBucketDetails are empty for the given quotaBucket: ", quotaBucket) + period, err := quotaBucket.GetPeriod() + if err != nil { + globalVariables.Log.Error("error getting period for: ", err.Error(), "for quotaBucket: ", quotaBucket) + qticker.Stop() + continue + } + //sync with counterService. + err = internalRefresh(quotaBucket, period) + if err != nil { + globalVariables.Log.Error("error during internalRefresh: ", err.Error(), "for quotaBucket: ", quotaBucket) + qticker.Stop() + continue + } + + if exitCount > 3 { + removeFromCache(quotaBucket.GetEdgeOrgID()+ + constants.CacheKeyDelimiter+quotaBucket.GetID(), + quotaCache[quotaBucket.GetEdgeOrgID()+constants.CacheKeyDelimiter+quotaBucket.GetID()]) + qticker.Stop() + } } - }() - } + } else { + globalVariables.Log.Error("aSyncBucketDetails are empty for the given quotaBucket: ", quotaBucket) + } + }() return quotaBucket, nil @@ -321,10 +423,6 @@ return q.quotaBucketData.QuotaType } -func (q *QuotaBucket) GetIsPreciseAtSecondsLevel() bool { - return q.quotaBucketData.PreciseAtSecondsLevel -} - func (q *QuotaBucket) GetMaxCount() int64 { return q.quotaBucketData.MaxCount } @@ -333,14 +431,6 @@ return q.quotaBucketData.Weight } -func (q *QuotaBucket) IsDistrubuted() bool { - return q.quotaBucketData.Distributed -} - -func (q *QuotaBucket) IsSynchronous() bool { - return q.quotaBucketData.Synchronous -} - //setCurrentPeriod only for rolling window else just return the value of QuotaPeriod. func (q *QuotaBucket) GetPeriod() (*quotaPeriod, error) { @@ -378,12 +468,7 @@ func (q *QuotaBucket) IncrementQuotaLimit() (*QuotaBucketResults, error) { - qBucketHandler, err := GetQuotaBucketHandler(q) - if err != nil { - return nil, errors.New("error getting quotaBucketHandler: " + err.Error()) - } - - return qBucketHandler.incrementQuotaCount(q) + return q.GetAsyncQuotaBucket().incrementQuotaCount(q) }
diff --git a/quotaBucket/quotaBucketType.go b/quotaBucket/quotaBucketType.go deleted file mode 100644 index cb3c1b7..0000000 --- a/quotaBucket/quotaBucketType.go +++ /dev/null
@@ -1,232 +0,0 @@ -package quotaBucket - -import ( - "errors" - "github.com/30x/apidQuota/services" - "github.com/30x/apidQuota/constants" -) - -type QuotaBucketType interface { - resetCount(bucket *QuotaBucket) error - incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) -} - -type SynchronousQuotaBucketType struct{} - -func (sQuotaBucket SynchronousQuotaBucketType) resetCount(qBucket *QuotaBucket) error { - //do nothing. - return nil -} - -func (sQuotaBucket SynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { - period, err := q.GetPeriod() - if err != nil { - return nil, errors.New("error getting period: " + err.Error()) - } - maxCount := q.GetMaxCount() - exceeded := false - remainingCount := int64(0) - - weight := q.GetWeight() - - //first retrieve the count from counter service. - currentCount, err := services.GetCount(q.GetEdgeOrgID(), q.GetID(), period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix()) - if err != nil { - return nil, err - } - - if period.IsCurrentPeriod(q) { - if currentCount < maxCount { - allowed := maxCount - currentCount - if allowed >= weight { - if weight != 0 { - currentCount, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix()) - if err != nil { - return nil, err - } - } - remainingCount = maxCount - (currentCount) - - } else { - if weight != 0 { - exceeded = true - } - remainingCount = maxCount - currentCount - } - } else { - exceeded = true - remainingCount = maxCount - currentCount - } - } - - if remainingCount < 0 { - remainingCount = int64(0) - } - - results := &QuotaBucketResults{ - EdgeOrgID: q.GetEdgeOrgID(), - ID: q.GetID(), - exceeded: exceeded, - remainingCount: remainingCount, - MaxCount: maxCount, - startTimestamp: period.GetPeriodStartTime().Unix(), - expiresTimestamp: period.GetPeriodEndTime().Unix(), - } - - return results, nil -} - -type AsynchronousQuotaBucketType struct { -} - -func (quotaBucketType AsynchronousQuotaBucketType) resetCount(q *QuotaBucket) error { - //yet to implement - return nil -} - -func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) { - period, err := q.GetPeriod() - if err != nil { - return nil, errors.New("error getting period: " + err.Error()) - } - aSyncBucket := q.GetAsyncQuotaBucket() - if aSyncBucket == nil { - return nil, errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.") - } - currentCount, err := aSyncBucket.getCount(q, period) - if err != nil { - return nil, err - } - - maxCount := q.GetMaxCount() - exceeded := false - remainingCount := int64(0) - weight := q.GetWeight() - - if period.IsCurrentPeriod(q) { - - if currentCount < maxCount { - diffCount := (currentCount + weight) - maxCount - if diffCount > 0 { - exceeded = true - remainingCount = maxCount - currentCount - - } else { - aSyncBucket.addToCounter(weight) - aSyncBucket.addToAsyncLocalMessageCount(weight) - remainingCount = maxCount - (currentCount + weight) - - } - - asyncMessageCount, err := aSyncBucket.getAsyncSyncMessageCount() - if err != nil { - return nil, err - } - - asyncLocalMsgCount,err := aSyncBucket.getAsyncLocalMessageCount() - if err != nil { - return nil, err - } - - if asyncMessageCount > 0 && - asyncLocalMsgCount >= asyncMessageCount { - err = internalRefresh(q, period) - if err != nil { - return nil, err - } - } - } else { - exceeded = true - remainingCount = maxCount - currentCount - } - } - if remainingCount < 0 { - remainingCount = int64(0) - } - - results := &QuotaBucketResults{ - EdgeOrgID: q.GetEdgeOrgID(), - ID: q.GetID(), - exceeded: exceeded, - remainingCount: remainingCount, - MaxCount: maxCount, - startTimestamp: period.GetPeriodStartTime().Unix(), - expiresTimestamp: period.GetPeriodEndTime().Unix(), - } - - return results, nil -} - -func internalRefresh(q *QuotaBucket, period *quotaPeriod) error { - var err error - aSyncBucket := q.GetAsyncQuotaBucket() - if aSyncBucket == nil { - return errors.New(constants.AsyncQuotaBucketEmpty) - } - - weight := int64(0) - countFromCounterService := int64(0) - globalCount,err := aSyncBucket.getAsyncGlobalCount() - if err != nil { - return err - } - - maxCount := q.GetMaxCount() - for _, counterEle := range *aSyncBucket.asyncCounter { - weight += counterEle - - //delete from asyncCounter - temp := *aSyncBucket.asyncCounter - temp = temp[1:] - aSyncBucket.asyncCounter = &temp - - if (weight + globalCount) >= maxCount { - //clear asyncCounter - for range *aSyncBucket.asyncCounter { - //delete all elements from asyncCounter - temp := *aSyncBucket.asyncCounter - temp = temp[1:] - aSyncBucket.asyncCounter = &temp - } - } - } - - countFromCounterService, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix()) - if err != nil { - return err - } - aSyncBucket.asyncGLobalCount = countFromCounterService - - aSyncBucket.asyncLocalMessageCount -= weight - return nil -} - -type NonDistributedQuotaBucketType struct{} - -func (sQuotaBucket NonDistributedQuotaBucketType) resetCount(qBucket *QuotaBucket) error { - //yet to implement - return errors.New("methog not implemented") -} -func (sQuotaBucket NonDistributedQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) { - - return nil, errors.New("methog not implemented") -} - -func GetQuotaBucketHandler(qBucket *QuotaBucket) (QuotaBucketType, error) { - - if !qBucket.IsDistrubuted() { - quotaBucketType := &NonDistributedQuotaBucketType{} - return quotaBucketType, nil - } else { - if qBucket.IsSynchronous() { - quotaBucketType := &SynchronousQuotaBucketType{} - return quotaBucketType, nil - } - quotaBucketType := &AsynchronousQuotaBucketType{} - return quotaBucketType, nil - - } - - return nil, errors.New("ignoring: unrecognized quota type") - -}
diff --git a/quotaBucket/quotaBucketType_test.go b/quotaBucket/quotaBucketType_test.go deleted file mode 100644 index d6ac8c0..0000000 --- a/quotaBucket/quotaBucketType_test.go +++ /dev/null
@@ -1,11 +0,0 @@ -package quotaBucket_test - -import ( - . "github.com/onsi/ginkgo" -) - -var _ = Describe("QuotaBucketType", func() { - It("test QuotaBucketType", func() { - //fmt.Println("inside QuotaBucketType") - }) -})
diff --git a/quotaBucket/quotaBucket_test.go b/quotaBucket/quotaBucket_test.go index cba72c4..4c5788b 100644 --- a/quotaBucket/quotaBucket_test.go +++ b/quotaBucket/quotaBucket_test.go
@@ -1,6 +1,5 @@ package quotaBucket_test - import ( "github.com/30x/apidQuota/constants" . "github.com/30x/apidQuota/quotaBucket" @@ -35,10 +34,15 @@ if IsValidTimeUnit("invalidType") { Fail("Expected false: invalidType is not a valid TimeUnit") } + + //invalid type + if IsValidTimeUnit("") { + Fail("Expected false: invalidType is not a valid TimeUnit") + } + }) }) - var _ = Describe("Test AcceptedQuotaTypes", func() { It("testTimeUnit", func() { if !IsValidType("calendar") { @@ -50,6 +54,9 @@ if IsValidType("invalidType") { Fail("Expected false: invalidType is not a valid quotaType") } + if IsValidType("") { + Fail("Expected false: invalidType is not a valid quotaType") + } }) }) @@ -58,18 +65,15 @@ //validate all fields set as expected. //validate period set as expected. - //validate async QuotaBucket is empty. + //validate async QuotaBucket is not empty. It("Create with NewQuotaBucket with all valid fields", func() { edgeOrgID := "sampleOrg" id := "sampleID" interval := 1 timeUnit := "hour" quotaType := "calendar" - preciseAtSecondsLevel := true maxCount := int64(10) weight := int64(1) - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) @@ -77,8 +81,8 @@ startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) now := time.Now().UTC() currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) @@ -94,44 +98,11 @@ Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String())) Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String())) - //check if isDistributed and isSynchronous are true. - if !quotaBucket.IsDistrubuted(){ - Fail("expected true, returned true.") - } - if !quotaBucket.IsSynchronous(){ - Fail("expected true, returned true.") - } - asyncBucket := quotaBucket.GetAsyncQuotaBucket() - if asyncBucket != nil{ - Fail("asyncBucket should be nil for synchronous request.") + if asyncBucket == nil { + Fail("asyncBucket cannot not be nil.") } - //testCase2: synchronous := true and (syncTimeInSec and syncMessageCount) > -1 -> aSyncQuotaBucket is nil. - syncTimeInSec = int64(10) - syncMessageCount = int64(10) - quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) - Expect(err).NotTo(HaveOccurred()) - now = time.Now().UTC() - currentHour = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) - err = quotaBucket.Validate() - Expect(err).NotTo(HaveOccurred()) - //check if isDistributed and isSynchronous are true. - if !quotaBucket.IsDistrubuted(){ - Fail("expected true, returned true.") - } - if !quotaBucket.IsSynchronous(){ - Fail("expected true, returned true.") - } - - asyncBucket = quotaBucket.GetAsyncQuotaBucket() - if asyncBucket != nil{ - Fail("asyncBucket should be nil for synchronous request.") - } - - }) //startTime for quotaBucket after time.Now() @@ -141,18 +112,15 @@ interval := 1 timeUnit := "hour" quotaType := "calendar" - preciseAtSecondsLevel := true maxCount := int64(10) weight := int64(1) - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) //start time is after now() -> should still set period. startTime := time.Now().UTC().AddDate(0, 1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) now := time.Now().UTC() currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) @@ -167,17 +135,9 @@ Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String())) Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String())) - //check if isDistributed and isSynchronous are true. - if !quotaBucket.IsDistrubuted(){ - Fail("expected true, returned false.") - } - if !quotaBucket.IsSynchronous(){ - Fail("expected true, returned false.") - } - - asyncBucket := quotaBucket.GetAsyncQuotaBucket() - if asyncBucket != nil{ - Fail("asyncBucket should be nil for synchronous request.") + currentPeriod := getPeriod.IsCurrentPeriod(quotaBucket); + if currentPeriod { + Fail("expected currentPeriod to be false") } }) @@ -190,11 +150,8 @@ interval := 1 timeUnit := "hour" quotaType := "calendar" - preciseAtSecondsLevel := true maxCount := int64(10) weight := int64(1) - distributed := true - synchronous := false //Testcase1 : with syncTimeInSec syncTimeInSec := int64(10) @@ -202,8 +159,8 @@ //start time is after now() -> should still set period. startTime := time.Now().UTC().AddDate(0, 1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) now := time.Now().UTC() currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) @@ -218,16 +175,8 @@ Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String())) Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String())) - //check if isDistributed is true and isSynchronous is false. - if !quotaBucket.IsDistrubuted(){ - Fail("expected true, returned false.") - } - if quotaBucket.IsSynchronous(){ - Fail("expected false, returned true.") - } - asyncBucket := quotaBucket.GetAsyncQuotaBucket() - if asyncBucket == nil{ + if asyncBucket == nil { Fail("asyncBucket should be nil for synchronous request.") } @@ -237,8 +186,8 @@ //start time is after now() -> should still set period. startTime = time.Now().UTC().AddDate(0, 1, 0).Unix() quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) now = time.Now().UTC() currentHour = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) @@ -253,16 +202,8 @@ Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String())) Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String())) - //check if isDistributed is true and isSynchronous is false. - if !quotaBucket.IsDistrubuted(){ - Fail("expected true, returned false.") - } - if quotaBucket.IsSynchronous(){ - Fail("expected false, returned true.") - } - asyncBucket = quotaBucket.GetAsyncQuotaBucket() - if asyncBucket == nil{ + if asyncBucket == nil { Fail("asyncBucket should be nil for synchronous request.") } @@ -272,8 +213,8 @@ //start time is after now() -> should still set period. startTime = time.Now().UTC().AddDate(0, 1, 0).Unix() quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).To(HaveOccurred()) }) @@ -284,11 +225,8 @@ interval := 1 timeUnit := "hour" quotaType := "calendar" - preciseAtSecondsLevel := true maxCount := int64(10) weight := int64(1) - distributed := false - synchronous := false syncTimeInSec := int64(-1) syncMessageCount := int64(-1) @@ -296,14 +234,13 @@ startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) now := time.Now().UTC() currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC) err = quotaBucket.Validate() Expect(err).NotTo(HaveOccurred()) - Expect(quotaBucket.IsSynchronous()).Should(BeFalse()) //also check if all the fields are set as expected getPeriod, err := quotaBucket.GetPeriod() @@ -314,28 +251,14 @@ Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String())) Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String())) - //check if isDistributed is false and isSynchronous is false. - if quotaBucket.IsDistrubuted(){ - Fail("expected false, returned true.") - } - if quotaBucket.IsSynchronous(){ - Fail("expected false, returned true.") - } asyncBucket := quotaBucket.GetAsyncQuotaBucket() - if asyncBucket != nil { - Fail("asyncBucket should be nil for synchronous request.") + if asyncBucket == nil { + Fail("asyncBucket can not be nil.") } - synchronous = true - quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) - Expect(err).To(HaveOccurred()) }) - - It("Test invalid timeUnitType", func() { edgeOrgID := "sampleOrg" id := "sampleID" @@ -344,16 +267,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) err = quotaBucket.Validate() Expect(err).To(HaveOccurred()) @@ -371,16 +291,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) err = quotaBucket.Validate() Expect(err).To(HaveOccurred()) @@ -400,9 +317,6 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) @@ -410,8 +324,8 @@ startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight,syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() Expect(err).NotTo(HaveOccurred()) @@ -427,8 +341,8 @@ //InputStart time is now startTime = time.Now().UTC().Unix() quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() Expect(err).NotTo(HaveOccurred()) @@ -452,17 +366,14 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true //InputStart time is after now. startTime := time.Now().UTC().AddDate(0, 1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -485,9 +396,6 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) @@ -495,8 +403,8 @@ startTime := time.Now().UTC().UTC().AddDate(-1, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -513,8 +421,8 @@ //InputStart time is now startTime = time.Now().UTC().Unix() quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -540,9 +448,6 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) @@ -550,8 +455,8 @@ startTime := time.Now().UTC().AddDate(0, 1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -575,17 +480,14 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) //InputStart time is before now startTime := time.Now().UTC().AddDate(0, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight,syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -605,8 +507,8 @@ //for non rolling Type window setCurrentPeriod as endTime is < time.now. quotaType = "calendar" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) qPeriod, err = quotaBucket.GetPeriod()
diff --git a/quotaBucket/quotaCache.go b/quotaBucket/quotaCache.go index 4afae81..6815f0b 100644 --- a/quotaBucket/quotaCache.go +++ b/quotaBucket/quotaCache.go
@@ -1,10 +1,10 @@ package quotaBucket import ( + "errors" "github.com/30x/apidQuota/constants" "sync" "time" - "errors" ) var quotaCachelock = sync.RWMutex{} @@ -52,17 +52,15 @@ func removeFromCache(cacheKey string, qBucketCache quotaBucketCache) error { //for async Stop the scheduler. - if qBucketCache.qBucket.Distributed && !qBucketCache.qBucket.IsSynchronous() { - aSyncBucket := qBucketCache.qBucket.GetAsyncQuotaBucket() - if aSyncBucket == nil { - return errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.") - } - qticker, err := aSyncBucket.getAsyncQTicker() - if err != nil { - return err - } - qticker.Stop() + aSyncBucket := qBucketCache.qBucket.GetAsyncQuotaBucket() + if aSyncBucket == nil { + return errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.") } + qticker, err := aSyncBucket.getAsyncQTicker() + if err != nil { + return err + } + qticker.Stop() quotaCachelock.Lock() delete(quotaCache, cacheKey)
diff --git a/quotaBucket/quotaDescriptorType_test.go b/quotaBucket/quotaDescriptorType_test.go index 26316f4..9f4ec8e 100644 --- a/quotaBucket/quotaDescriptorType_test.go +++ b/quotaBucket/quotaDescriptorType_test.go
@@ -47,16 +47,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix() quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -78,8 +75,8 @@ timeUnit = "minute" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -101,8 +98,8 @@ timeUnit = "hour" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -124,8 +121,8 @@ timeUnit = "day" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -147,8 +144,8 @@ timeUnit = "week" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -170,8 +167,8 @@ timeUnit = "month" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -203,16 +200,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -234,8 +228,8 @@ timeUnit = "invalidTimeUnit" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) err = quotaBucket.Validate() Expect(err).To(HaveOccurred()) if ok := strings.Contains(err.Error(), constants.InvalidQuotaTimeUnitType); !ok { @@ -254,16 +248,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -285,8 +276,8 @@ timeUnit = "minute" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -308,8 +299,8 @@ timeUnit = "hour" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -331,8 +322,8 @@ timeUnit = "day" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -354,8 +345,8 @@ timeUnit = "week" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -377,8 +368,8 @@ timeUnit = "month" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -410,16 +401,13 @@ interval := 1 maxCount := int64(10) weight := int64(1) - preciseAtSecondsLevel := true startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix() - distributed := true - synchronous := true syncTimeInSec := int64(-1) syncMessageCount := int64(-1) quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) Expect(err).NotTo(HaveOccurred()) err = quotaBucket.Validate() @@ -441,8 +429,8 @@ timeUnit = "invalidTimeUnit" quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, - quotaType, preciseAtSecondsLevel, startTime, maxCount, - weight, distributed, synchronous, syncTimeInSec, syncMessageCount) + quotaType, startTime, maxCount, + weight, syncTimeInSec, syncMessageCount) err = quotaBucket.Validate() Expect(err).To(HaveOccurred()) if ok := strings.Contains(err.Error(), constants.InvalidQuotaTimeUnitType); !ok {
diff --git a/services/counterServiceHelper.go b/services/counterServiceHelper.go index 5a21ce8..186afb6 100644 --- a/services/counterServiceHelper.go +++ b/services/counterServiceHelper.go
@@ -10,6 +10,8 @@ "net/http" "net/url" "time" + "fmt" + "os" ) const ( @@ -75,15 +77,23 @@ } addApigeeSyncTokenToHeader(request) + fmt.Println("request: ", request) resp, err := client.Do(request) if err != nil { return 0, errors.New("error calling CounterService: " + err.Error()) } + defer resp.Body.Close() + globalVariables.Log.Debug("response: ", resp) if resp.StatusCode != http.StatusOK { + fmt.Println("resp: ", resp) + respBodyBytes, err := ioutil.ReadAll(resp.Body) + fmt.Println(os.Stdout, string(respBodyBytes)) //<-- here ! + + if resp.StatusCode == http.StatusNotFound { return 0, errors.New("response from counter service: " + resp.Status + " and response body is: " + string(respBodyBytes)) }