support only async, distributed quota and preciseAtSecondsLevel always true
diff --git a/api_test.go b/api_test.go
index 2fe6bc0..07b14f2 100644
--- a/api_test.go
+++ b/api_test.go
@@ -1,10 +1,10 @@
package apidQuota_test
import (
- . "github.com/onsi/ginkgo"
"bytes"
"encoding/json"
"github.com/google/uuid"
+ . "github.com/onsi/ginkgo"
"io/ioutil"
"net/http"
"time"
diff --git a/constants/constants.go b/constants/constants.go
index 2b640ac..6c2f131 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -4,7 +4,7 @@
const (
//config variables.
- ApigeeSyncBearerToken = "apigeesync_bearer_token"
+ ApigeeSyncBearerToken = "apigeesync_bearer_token"
ConfigCounterServiceBasePath = "apidquota_counterService_base_path"
//add to acceptedTimeUnitList in init() if case any other new timeUnit is added
@@ -19,7 +19,7 @@
InvalidQuotaTimeUnitType = "invalidQuotaTimeUnitType"
InvalidQuotaType = "invalidQuotaType"
InvalidQuotaPeriod = "invalidQuotaPeriod"
- AsyncQuotaBucketEmpty = "AsyncDetails_for_quotaBucket_are_empty"
+ AsyncQuotaBucketEmpty = "AsyncDetails_for_quotaBucket_are_empty"
QuotaTypeCalendar = "calendar" // after start time
QuotaTypeRollingWindow = "rollingwindow" // in the past "window" time
@@ -36,7 +36,7 @@
ErrorCheckingQuotaLimit = "error_checking_quota_limit"
QuotaBasePathDefault = "/quota"
- URLCounterServiceNotSet = "url_counter_service_not_set"
- URLCounterServiceInvalid = "url_counter_service_invalid"
- MarshalJSONError = "marshal_JSON_error"
+ URLCounterServiceNotSet = "url_counter_service_not_set"
+ URLCounterServiceInvalid = "url_counter_service_invalid"
+ MarshalJSONError = "marshal_JSON_error"
)
diff --git a/quotaBucket/apiUtil.go b/quotaBucket/apiUtil.go
index 26b958d..7b33ddc 100644
--- a/quotaBucket/apiUtil.go
+++ b/quotaBucket/apiUtil.go
@@ -8,9 +8,26 @@
)
const (
+ //request response common params
reqEdgeOrgID = "edgeOrgID"
reqID = "id"
reqMaxCount = "maxCount"
+
+ //request specific params
+ reqInterval = "interval"
+ reqTimeUnit = "timeUnit"
+ reqQType = "type"
+ reqStartTimestamp = "startTimestamp"
+ reqSyncTimeInSec = "syncTimeInSec"
+ reqSyncMessageCount = "syncMessageCount"
+ reqWeight = "weight"
+
+ //response specific params
+ respExceeded = "exceeded"
+ respRemainingCount = "remainingCount"
+ respExpiresTimestamp = "expiresTimestamp"
+ respStartTimestamp = "startTimestamp"
+
)
type QuotaBucketResults struct {
@@ -28,78 +45,68 @@
var edgeOrgID, id, timeUnit, quotaType string
var interval int
var startTime, maxCount, weight int64
- var preciseAtSecondsLevel, distributed bool
newQBucket := &QuotaBucket{}
var err error
value, ok := quotaBucketMap[reqEdgeOrgID]
if !ok {
- return errors.New(`missing field: 'edgeOrgID' is required`)
+ return errors.New("missing field: "+ reqEdgeOrgID + " is required")
}
if edgeOrgIDType := reflect.TypeOf(value); edgeOrgIDType.Kind() != reflect.String {
- return errors.New(`invalid type : 'edgeOrgID' should be a string`)
+ return errors.New("invalid type : "+ reqEdgeOrgID + " should be a string")
}
edgeOrgID = value.(string)
value, ok = quotaBucketMap[reqID]
if !ok {
- return errors.New(`missing field: 'id' is required`)
+ return errors.New("missing field: "+ reqID + " is required")
}
if idType := reflect.TypeOf(value); idType.Kind() != reflect.String {
- return errors.New(`invalid type : 'id' should be a string`)
+ return errors.New("invalid type : "+ reqID + " should be a string")
}
id = value.(string)
//build cacheKey - to retrieve from or add to quotaCache
cacheKey = edgeOrgID + constants.CacheKeyDelimiter + id
- value, ok = quotaBucketMap["interval"]
+ value, ok = quotaBucketMap[reqInterval]
if !ok {
- return errors.New(`missing field: 'interval' is required`)
+ return errors.New("missing field: "+ reqInterval + " is required")
}
- //from input when its read its float, need to then convert to int.
+ //from input its read as float, hence need to then convert to int.
if intervalType := reflect.TypeOf(value); intervalType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'interval' should be a number`)
+ return errors.New("invalid type : "+ reqInterval + " should be a number")
}
intervalFloat := value.(float64)
interval = int(intervalFloat)
//TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH}
- value, ok = quotaBucketMap["timeUnit"]
+ value, ok = quotaBucketMap[reqTimeUnit]
if !ok {
- return errors.New(`missing field: 'timeUnit' is required`)
+ return errors.New("missing field: "+ reqTimeUnit + " is required")
}
if timeUnitType := reflect.TypeOf(value); timeUnitType.Kind() != reflect.String {
- return errors.New(`invalid type : 'timeUnit' should be a string`)
+ return errors.New("invalid type : "+ reqTimeUnit + " should be a string")
}
timeUnit = value.(string)
//QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW}
- value, ok = quotaBucketMap["type"]
+ value, ok = quotaBucketMap[reqQType]
if !ok {
- return errors.New(`missing field: 'type' is required`)
+ return errors.New("missing field: "+ reqQType + " is required")
}
if quotaTypeType := reflect.TypeOf(value); quotaTypeType.Kind() != reflect.String {
- return errors.New(`invalid type : 'type' should be a string`)
+ return errors.New("invalid type : "+ reqQType + " should be a string")
}
quotaType = value.(string)
- value, ok = quotaBucketMap["preciseAtSecondsLevel"]
- if !ok {
- return errors.New(`missing field: 'preciseAtSecondsLevel' is required`)
- }
- if preciseAtSecondsLevelType := reflect.TypeOf(value); preciseAtSecondsLevelType.Kind() != reflect.Bool {
- return errors.New(`invalid type : 'preciseAtSecondsLevel' should be boolean`)
- }
- preciseAtSecondsLevel = value.(bool)
-
- value, ok = quotaBucketMap["startTimestamp"]
+ value, ok = quotaBucketMap[reqStartTimestamp]
if !ok { //todo: in the current cps code startTime is optional for QuotaBucket. should we make startTime optional to NewQuotaBucket?
startTime = time.Now().UTC().Unix()
} else {
// //from input when its read its float, need to then convert to int.
if startTimeType := reflect.TypeOf(value); startTimeType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'startTime' should be UNIX timestamp`)
+ return errors.New("invalid type : "+ reqStartTimestamp + " should be UNIX timestamp")
}
startTimeFloat := value.(float64)
startTime = int64(startTimeFloat)
@@ -107,127 +114,79 @@
value, ok = quotaBucketMap[reqMaxCount]
if !ok {
- return errors.New(`missing field: 'maxCount' is required`)
+ return errors.New("missing field: "+ reqMaxCount + " is required")
}
//from input when its read its float, need to then convert to int.
if maxCountType := reflect.TypeOf(value); maxCountType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'maxCount' should be a number`)
+ return errors.New("invalid type : "+ reqMaxCount + " should be a number")
}
maxCountFloat := value.(float64)
maxCount = int64(maxCountFloat)
- value, ok = quotaBucketMap["weight"]
+ value, ok = quotaBucketMap[reqWeight]
if !ok {
- return errors.New(`missing field: 'weight' is required`)
+ return errors.New("missing field: "+ reqWeight + " is required")
}
//from input when its read its float, need to then convert to int.
if weightType := reflect.TypeOf(value); weightType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'maxCount' should be a number`)
+ return errors.New("invalid type : "+ reqWeight + " should be a number")
}
weightFloat := value.(float64)
weight = int64(weightFloat)
- value, ok = quotaBucketMap["distributed"]
- if !ok {
- return errors.New(`missing field: 'distributed' is required`)
+ syncTimeValue, syncTimeOK := quotaBucketMap[reqSyncTimeInSec]
+ syncMsgCountValue, syncMsgCountOK := quotaBucketMap[reqSyncMessageCount]
+
+ if syncTimeOK && syncMsgCountOK {
+ return errors.New("either "+ reqSyncTimeInSec + " or "+ reqSyncMessageCount + " should be present but not both.")
}
- if preciseAtSecondsLevelType := reflect.TypeOf(value); preciseAtSecondsLevelType.Kind() != reflect.Bool {
- return errors.New(`invalid type : 'distributed' should be boolean`)
+
+ if !syncTimeOK && !syncMsgCountOK {
+ return errors.New("either "+ reqSyncTimeInSec + " or "+ reqSyncMessageCount + " should be present. both cant be empty.")
}
- distributed = value.(bool)
- //if distributed check for sync or async Quota
- if distributed {
- value, ok = quotaBucketMap["synchronous"]
- if !ok {
- return errors.New(`missing field: 'synchronous' is required`)
+ if syncTimeOK {
+ if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 {
+ return errors.New("invalid type : "+ reqSyncTimeInSec + " should be a number")
}
- if synchronousType := reflect.TypeOf(value); synchronousType.Kind() != reflect.Bool {
- return errors.New(`invalid type : 'synchronous' should be boolean`)
- }
- synchronous := value.(bool)
-
- // for async retrieve syncTimeSec or syncMessageCount
- if !synchronous {
- syncTimeValue, syncTimeOK := quotaBucketMap["syncTimeInSec"]
- syncMsgCountValue, syncMsgCountOK := quotaBucketMap["syncMessageCount"]
-
- if syncTimeOK && syncMsgCountOK {
- return errors.New(`either syncTimeInSec or syncMessageCount should be present but not both.`)
- }
-
- if !syncTimeOK && !syncMsgCountOK {
- return errors.New(`either syncTimeInSec or syncMessageCount should be present. both cant be empty.`)
- }
-
- if syncTimeOK {
- if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'syncTimeInSec' should be a number`)
- }
- syncTimeFloat := syncTimeValue.(float64)
- syncTimeInt := int64(syncTimeFloat)
-
- //try to retrieve from cache
- newQBucket, ok = getFromCache(cacheKey, weight)
-
- if !ok {
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
-
- qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
-
- if err := qBucketRequest.Validate(); err != nil {
- return errors.New("error validating quotaBucket: " + err.Error())
- }
-
- addToCache(qBucketRequest)
- return nil
- }
- qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
-
- return nil
-
- } else if syncMsgCountOK {
- if syncMsgCountType := reflect.TypeOf(syncMsgCountValue); syncMsgCountType.Kind() != reflect.Float64 {
- return errors.New(`invalid type : 'syncTimeInSec' should be a number`)
- }
- syncMsgCountFloat := syncMsgCountValue.(float64)
- syncMsgCountInt := int64(syncMsgCountFloat)
- //try to retrieve from cache
- newQBucket, ok = getFromCache(cacheKey, weight)
-
- if !ok {
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
- qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
-
- if err := qBucketRequest.Validate(); err != nil {
- return errors.New("error validating quotaBucket: " + err.Error())
- }
-
- addToCache(qBucketRequest)
- return nil
-
- }
- qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
-
- return nil
- }
- }
+ syncTimeFloat := syncTimeValue.(float64)
+ syncTimeInt := int64(syncTimeFloat)
//try to retrieve from cache
newQBucket, ok = getFromCache(cacheKey, weight)
if !ok {
- //for synchronous quotaBucket
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, -1, -1)
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType,
+ startTime, maxCount, weight, syncTimeInt, -1)
+ if err != nil {
+ return errors.New("error creating quotaBucket: " + err.Error())
+ }
+
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ if err := qBucketRequest.Validate(); err != nil {
+ return errors.New("error validating quotaBucket: " + err.Error())
+ }
+
+ addToCache(qBucketRequest)
+ return nil
+ }
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ return nil
+
+ } else if syncMsgCountOK {
+ if syncMsgCountType := reflect.TypeOf(syncMsgCountValue); syncMsgCountType.Kind() != reflect.Float64 {
+ return errors.New("invalid type : "+ reqSyncMessageCount + " should be a number")
+ }
+ syncMsgCountFloat := syncMsgCountValue.(float64)
+ syncMsgCountInt := int64(syncMsgCountFloat)
+ //try to retrieve from cache
+ newQBucket, ok = getFromCache(cacheKey, weight)
+
+ if !ok {
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType,
+ startTime, maxCount, weight, -1, syncMsgCountInt)
if err != nil {
return errors.New("error creating quotaBucket: " + err.Error())
}
@@ -236,37 +195,36 @@
if err := qBucketRequest.Validate(); err != nil {
return errors.New("error validating quotaBucket: " + err.Error())
}
+
addToCache(qBucketRequest)
return nil
- }
+ }
qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
return nil
}
- //for non distributed quotaBucket
- //retrieveFromCache.
+ //try to retrieve from cache
newQBucket, ok = getFromCache(cacheKey, weight)
if !ok {
- //for non distributed quotaBucket
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, false, -1, -1)
+ //for synchronous quotaBucket
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType,
+ startTime, maxCount, weight, -1, -1)
if err != nil {
return errors.New("error creating quotaBucket: " + err.Error())
-
}
-
qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
if err := qBucketRequest.Validate(); err != nil {
return errors.New("error validating quotaBucket: " + err.Error())
}
-
addToCache(qBucketRequest)
+ return nil
}
- qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
return nil
}
@@ -276,10 +234,10 @@
resultsMap[reqEdgeOrgID] = qBucketResults.EdgeOrgID
resultsMap[reqID] = qBucketResults.ID
resultsMap[reqMaxCount] = qBucketResults.MaxCount
- resultsMap["exceeded"] = qBucketResults.exceeded
- resultsMap["remainingCount"] = qBucketResults.remainingCount
- resultsMap["startTimestamp"] = qBucketResults.startTimestamp
- resultsMap["expiresTimestamp"] = qBucketResults.expiresTimestamp
+ resultsMap[respExceeded] = qBucketResults.exceeded
+ resultsMap[respRemainingCount] = qBucketResults.remainingCount
+ resultsMap[respStartTimestamp] = qBucketResults.startTimestamp
+ resultsMap[respExpiresTimestamp] = qBucketResults.expiresTimestamp
return resultsMap
}
diff --git a/quotaBucket/quotaBucket.go b/quotaBucket/quotaBucket.go
index 081211e..334f3c0 100644
--- a/quotaBucket/quotaBucket.go
+++ b/quotaBucket/quotaBucket.go
@@ -155,19 +155,133 @@
return aSyncbucket.asyncGLobalCount + aSyncbucket.asyncLocalMessageCount, nil
}
+func (quotaBucketType *aSyncQuotaBucket) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) {
+ period, err := q.GetPeriod()
+ if err != nil {
+ return nil, errors.New("error getting period: " + err.Error())
+ }
+ aSyncBucket := q.GetAsyncQuotaBucket()
+ if aSyncBucket == nil {
+ return nil, errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.")
+ }
+ currentCount, err := aSyncBucket.getCount(q, period)
+ if err != nil {
+ return nil, err
+ }
+
+ maxCount := q.GetMaxCount()
+ exceeded := false
+ remainingCount := int64(0)
+ weight := q.GetWeight()
+
+ if period.IsCurrentPeriod(q) {
+
+ if currentCount < maxCount {
+ diffCount := (currentCount + weight) - maxCount
+ if diffCount > 0 {
+ exceeded = true
+ remainingCount = maxCount - currentCount
+
+ } else {
+ aSyncBucket.addToCounter(weight)
+ aSyncBucket.addToAsyncLocalMessageCount(weight)
+ remainingCount = maxCount - (currentCount + weight)
+
+ }
+
+ asyncMessageCount, err := aSyncBucket.getAsyncSyncMessageCount()
+ if err != nil {
+ return nil, err
+ }
+
+ asyncLocalMsgCount, err := aSyncBucket.getAsyncLocalMessageCount()
+ if err != nil {
+ return nil, err
+ }
+
+ if asyncMessageCount > 0 &&
+ asyncLocalMsgCount >= asyncMessageCount {
+ err = internalRefresh(q, period)
+ if err != nil {
+ return nil, err
+ }
+ }
+ } else {
+ exceeded = true
+ remainingCount = maxCount - currentCount
+ }
+ }
+ if remainingCount < 0 {
+ remainingCount = int64(0)
+ }
+
+ results := &QuotaBucketResults{
+ EdgeOrgID: q.GetEdgeOrgID(),
+ ID: q.GetID(),
+ exceeded: exceeded,
+ remainingCount: remainingCount,
+ MaxCount: maxCount,
+ startTimestamp: period.GetPeriodStartTime().Unix(),
+ expiresTimestamp: period.GetPeriodEndTime().Unix(),
+ }
+
+ return results, nil
+}
+
+func internalRefresh(q *QuotaBucket, period *quotaPeriod) error {
+ var err error
+ aSyncBucket := q.GetAsyncQuotaBucket()
+ if aSyncBucket == nil {
+ return errors.New(constants.AsyncQuotaBucketEmpty)
+ }
+
+ weight := int64(0)
+ countFromCounterService := int64(0)
+ globalCount, err := aSyncBucket.getAsyncGlobalCount()
+ if err != nil {
+ return err
+ }
+
+ maxCount := q.GetMaxCount()
+ for _, counterEle := range *aSyncBucket.asyncCounter {
+ weight += counterEle
+
+ //delete from asyncCounter
+ temp := *aSyncBucket.asyncCounter
+ temp = temp[1:]
+ aSyncBucket.asyncCounter = &temp
+
+ if (weight + globalCount) >= maxCount {
+ //clear asyncCounter
+ for range *aSyncBucket.asyncCounter {
+ //delete all elements from asyncCounter
+ temp := *aSyncBucket.asyncCounter
+ temp = temp[1:]
+ aSyncBucket.asyncCounter = &temp
+ }
+ }
+ }
+
+ countFromCounterService, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix())
+ if err != nil {
+ return err
+ }
+ aSyncBucket.asyncGLobalCount = countFromCounterService
+
+ aSyncBucket.asyncLocalMessageCount -= weight
+ return nil
+}
+
type quotaBucketData struct {
- EdgeOrgID string
- ID string
- Interval int
- TimeUnit string //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH}
- QuotaType string //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW}
- PreciseAtSecondsLevel bool
- StartTime time.Time
- MaxCount int64
- Weight int64
- Distributed bool
- Synchronous bool
- AsyncQuotaDetails *aSyncQuotaBucket
+ EdgeOrgID string
+ ID string
+ Interval int
+ TimeUnit string //TimeUnit {SECOND, MINUTE, HOUR, DAY, WEEK, MONTH}
+ QuotaType string //QuotaType {CALENDAR, FLEXI, ROLLING_WINDOW}
+ StartTime time.Time
+ MaxCount int64
+ Weight int64
+ AsyncQuotaDetails *aSyncQuotaBucket
}
type QuotaBucket struct {
@@ -175,98 +289,86 @@
}
func NewQuotaBucket(edgeOrgID string, id string, interval int,
- timeUnit string, quotaType string, preciseAtSecondsLevel bool,
- startTime int64, maxCount int64, weight int64, distributed bool,
- synchronous bool, syncTimeInSec int64, syncMessageCount int64) (*QuotaBucket, error) {
+ timeUnit string, quotaType string,
+ startTime int64, maxCount int64, weight int64, syncTimeInSec int64, syncMessageCount int64) (*QuotaBucket, error) {
fromUNIXTime := time.Unix(startTime, 0)
quotaBucketDataStruct := quotaBucketData{
- EdgeOrgID: edgeOrgID,
- ID: id,
- Interval: interval,
- TimeUnit: timeUnit,
- QuotaType: quotaType,
- PreciseAtSecondsLevel: preciseAtSecondsLevel,
- StartTime: fromUNIXTime,
- MaxCount: maxCount,
- Weight: weight,
- Distributed: distributed,
- Synchronous: synchronous,
- AsyncQuotaDetails: nil,
+ EdgeOrgID: edgeOrgID,
+ ID: id,
+ Interval: interval,
+ TimeUnit: timeUnit,
+ QuotaType: quotaType,
+ StartTime: fromUNIXTime,
+ MaxCount: maxCount,
+ Weight: weight,
+ AsyncQuotaDetails: nil,
}
quotaBucket := &QuotaBucket{
quotaBucketData: quotaBucketDataStruct,
}
- if !quotaBucket.IsDistrubuted() {
- if quotaBucket.IsSynchronous(){
- return nil, errors.New("quota bucket cannot be both nonDistributed and synchronous.")
- }
+ var quotaTicker int64
+ //ensure just one of syncTimeInSec and syncMessageCount is set.
+ if syncTimeInSec > -1 && syncMessageCount > -1 {
+ return nil, errors.New("both syncTimeInSec and syncMessageCount canot be set. only one of them should be set.")
}
- //for async set AsyncQuotaDetails and start the NewTicker
- if distributed && !synchronous {
- var quotaTicker int64
- //ensure just one of syncTimeInSec and syncMessageCount is set.
- if syncTimeInSec > -1 && syncMessageCount > -1 {
- return nil,errors.New("both syncTimeInSec and syncMessageCount canot be set. only one of them should be set.")
- }
- //set default syncTime for AsyncQuotaBucket.
- //for aSyncQuotaBucket with 'syncMessageCount' the ticker is invoked with DefaultQuotaSyncTime
- quotaTicker = constants.DefaultQuotaSyncTime
+ //set default syncTime for AsyncQuotaBucket.
+ //for aSyncQuotaBucket with 'syncMessageCount' the ticker is invoked with DefaultQuotaSyncTime
+ quotaTicker = constants.DefaultQuotaSyncTime
- if syncTimeInSec > 0 { //if sync with counter service periodically
- quotaTicker = syncTimeInSec
- }
+ if syncTimeInSec > 0 { //if sync with counter service periodically
+ quotaTicker = syncTimeInSec
+ }
- counter := make([]int64, 0)
- newAsyncQuotaDetails := &aSyncQuotaBucket{
- syncTimeInSec: syncTimeInSec,
- syncMessageCount: syncMessageCount,
- asyncCounter: &counter,
- asyncGLobalCount: constants.DefaultCount,
- asyncLocalMessageCount: constants.DefaultCount,
- initialized: false,
- qTicker: time.NewTicker(time.Duration(time.Second.Nanoseconds() * quotaTicker)),
- }
+ counter := make([]int64, 0)
+ newAsyncQuotaDetails := &aSyncQuotaBucket{
+ syncTimeInSec: syncTimeInSec,
+ syncMessageCount: syncMessageCount,
+ asyncCounter: &counter,
+ asyncGLobalCount: constants.DefaultCount,
+ asyncLocalMessageCount: constants.DefaultCount,
+ initialized: false,
+ qTicker: time.NewTicker(time.Duration(time.Second.Nanoseconds() * quotaTicker)),
+ }
- quotaBucket.setAsyncQuotaBucket(newAsyncQuotaDetails)
- go func() {
- aSyncBucket := quotaBucket.GetAsyncQuotaBucket()
- if aSyncBucket != nil {
- exitCount := int64(0)
- qticker, _ := aSyncBucket.getAsyncQTicker()
- for t := range qticker.C {
- globalVariables.Log.Debug("t: ", t.String())
- if len(*aSyncBucket.asyncCounter) == 0 {
- exitCount += 1
- }
- period, err := quotaBucket.GetPeriod()
- if err != nil {
- globalVariables.Log.Error("error getting period for: ", err.Error(), "for quotaBucket: ", quotaBucket)
- qticker.Stop()
- continue
- }
- //sync with counterService.
- err = internalRefresh(quotaBucket, period)
- if err != nil {
- globalVariables.Log.Error("error during internalRefresh: ", err.Error(), "for quotaBucket: ", quotaBucket)
- qticker.Stop()
- continue
- }
-
- if exitCount > 3 {
- removeFromCache( quotaBucket.GetEdgeOrgID()+
- constants.CacheKeyDelimiter + quotaBucket.GetID(),
- quotaCache[quotaBucket.GetEdgeOrgID()+constants.CacheKeyDelimiter + quotaBucket.GetID()])
- qticker.Stop()
- }
+ quotaBucket.setAsyncQuotaBucket(newAsyncQuotaDetails)
+ go func() {
+ aSyncBucket := quotaBucket.GetAsyncQuotaBucket()
+ if aSyncBucket != nil {
+ exitCount := int64(0)
+ qticker, _ := aSyncBucket.getAsyncQTicker()
+ for t := range qticker.C {
+ globalVariables.Log.Debug("t: ", t.String())
+ if len(*aSyncBucket.asyncCounter) == 0 {
+ exitCount += 1
}
- } else {
- globalVariables.Log.Error("aSyncBucketDetails are empty for the given quotaBucket: ", quotaBucket)
+ period, err := quotaBucket.GetPeriod()
+ if err != nil {
+ globalVariables.Log.Error("error getting period for: ", err.Error(), "for quotaBucket: ", quotaBucket)
+ qticker.Stop()
+ continue
+ }
+ //sync with counterService.
+ err = internalRefresh(quotaBucket, period)
+ if err != nil {
+ globalVariables.Log.Error("error during internalRefresh: ", err.Error(), "for quotaBucket: ", quotaBucket)
+ qticker.Stop()
+ continue
+ }
+
+ if exitCount > 3 {
+ removeFromCache(quotaBucket.GetEdgeOrgID()+
+ constants.CacheKeyDelimiter+quotaBucket.GetID(),
+ quotaCache[quotaBucket.GetEdgeOrgID()+constants.CacheKeyDelimiter+quotaBucket.GetID()])
+ qticker.Stop()
+ }
}
- }()
- }
+ } else {
+ globalVariables.Log.Error("aSyncBucketDetails are empty for the given quotaBucket: ", quotaBucket)
+ }
+ }()
return quotaBucket, nil
@@ -321,10 +423,6 @@
return q.quotaBucketData.QuotaType
}
-func (q *QuotaBucket) GetIsPreciseAtSecondsLevel() bool {
- return q.quotaBucketData.PreciseAtSecondsLevel
-}
-
func (q *QuotaBucket) GetMaxCount() int64 {
return q.quotaBucketData.MaxCount
}
@@ -333,14 +431,6 @@
return q.quotaBucketData.Weight
}
-func (q *QuotaBucket) IsDistrubuted() bool {
- return q.quotaBucketData.Distributed
-}
-
-func (q *QuotaBucket) IsSynchronous() bool {
- return q.quotaBucketData.Synchronous
-}
-
//setCurrentPeriod only for rolling window else just return the value of QuotaPeriod.
func (q *QuotaBucket) GetPeriod() (*quotaPeriod, error) {
@@ -378,12 +468,7 @@
func (q *QuotaBucket) IncrementQuotaLimit() (*QuotaBucketResults, error) {
- qBucketHandler, err := GetQuotaBucketHandler(q)
- if err != nil {
- return nil, errors.New("error getting quotaBucketHandler: " + err.Error())
- }
-
- return qBucketHandler.incrementQuotaCount(q)
+ return q.GetAsyncQuotaBucket().incrementQuotaCount(q)
}
diff --git a/quotaBucket/quotaBucketType.go b/quotaBucket/quotaBucketType.go
deleted file mode 100644
index cb3c1b7..0000000
--- a/quotaBucket/quotaBucketType.go
+++ /dev/null
@@ -1,232 +0,0 @@
-package quotaBucket
-
-import (
- "errors"
- "github.com/30x/apidQuota/services"
- "github.com/30x/apidQuota/constants"
-)
-
-type QuotaBucketType interface {
- resetCount(bucket *QuotaBucket) error
- incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error)
-}
-
-type SynchronousQuotaBucketType struct{}
-
-func (sQuotaBucket SynchronousQuotaBucketType) resetCount(qBucket *QuotaBucket) error {
- //do nothing.
- return nil
-}
-
-func (sQuotaBucket SynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) {
- period, err := q.GetPeriod()
- if err != nil {
- return nil, errors.New("error getting period: " + err.Error())
- }
- maxCount := q.GetMaxCount()
- exceeded := false
- remainingCount := int64(0)
-
- weight := q.GetWeight()
-
- //first retrieve the count from counter service.
- currentCount, err := services.GetCount(q.GetEdgeOrgID(), q.GetID(), period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix())
- if err != nil {
- return nil, err
- }
-
- if period.IsCurrentPeriod(q) {
- if currentCount < maxCount {
- allowed := maxCount - currentCount
- if allowed >= weight {
- if weight != 0 {
- currentCount, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix())
- if err != nil {
- return nil, err
- }
- }
- remainingCount = maxCount - (currentCount)
-
- } else {
- if weight != 0 {
- exceeded = true
- }
- remainingCount = maxCount - currentCount
- }
- } else {
- exceeded = true
- remainingCount = maxCount - currentCount
- }
- }
-
- if remainingCount < 0 {
- remainingCount = int64(0)
- }
-
- results := &QuotaBucketResults{
- EdgeOrgID: q.GetEdgeOrgID(),
- ID: q.GetID(),
- exceeded: exceeded,
- remainingCount: remainingCount,
- MaxCount: maxCount,
- startTimestamp: period.GetPeriodStartTime().Unix(),
- expiresTimestamp: period.GetPeriodEndTime().Unix(),
- }
-
- return results, nil
-}
-
-type AsynchronousQuotaBucketType struct {
-}
-
-func (quotaBucketType AsynchronousQuotaBucketType) resetCount(q *QuotaBucket) error {
- //yet to implement
- return nil
-}
-
-func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) {
- period, err := q.GetPeriod()
- if err != nil {
- return nil, errors.New("error getting period: " + err.Error())
- }
- aSyncBucket := q.GetAsyncQuotaBucket()
- if aSyncBucket == nil {
- return nil, errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.")
- }
- currentCount, err := aSyncBucket.getCount(q, period)
- if err != nil {
- return nil, err
- }
-
- maxCount := q.GetMaxCount()
- exceeded := false
- remainingCount := int64(0)
- weight := q.GetWeight()
-
- if period.IsCurrentPeriod(q) {
-
- if currentCount < maxCount {
- diffCount := (currentCount + weight) - maxCount
- if diffCount > 0 {
- exceeded = true
- remainingCount = maxCount - currentCount
-
- } else {
- aSyncBucket.addToCounter(weight)
- aSyncBucket.addToAsyncLocalMessageCount(weight)
- remainingCount = maxCount - (currentCount + weight)
-
- }
-
- asyncMessageCount, err := aSyncBucket.getAsyncSyncMessageCount()
- if err != nil {
- return nil, err
- }
-
- asyncLocalMsgCount,err := aSyncBucket.getAsyncLocalMessageCount()
- if err != nil {
- return nil, err
- }
-
- if asyncMessageCount > 0 &&
- asyncLocalMsgCount >= asyncMessageCount {
- err = internalRefresh(q, period)
- if err != nil {
- return nil, err
- }
- }
- } else {
- exceeded = true
- remainingCount = maxCount - currentCount
- }
- }
- if remainingCount < 0 {
- remainingCount = int64(0)
- }
-
- results := &QuotaBucketResults{
- EdgeOrgID: q.GetEdgeOrgID(),
- ID: q.GetID(),
- exceeded: exceeded,
- remainingCount: remainingCount,
- MaxCount: maxCount,
- startTimestamp: period.GetPeriodStartTime().Unix(),
- expiresTimestamp: period.GetPeriodEndTime().Unix(),
- }
-
- return results, nil
-}
-
-func internalRefresh(q *QuotaBucket, period *quotaPeriod) error {
- var err error
- aSyncBucket := q.GetAsyncQuotaBucket()
- if aSyncBucket == nil {
- return errors.New(constants.AsyncQuotaBucketEmpty)
- }
-
- weight := int64(0)
- countFromCounterService := int64(0)
- globalCount,err := aSyncBucket.getAsyncGlobalCount()
- if err != nil {
- return err
- }
-
- maxCount := q.GetMaxCount()
- for _, counterEle := range *aSyncBucket.asyncCounter {
- weight += counterEle
-
- //delete from asyncCounter
- temp := *aSyncBucket.asyncCounter
- temp = temp[1:]
- aSyncBucket.asyncCounter = &temp
-
- if (weight + globalCount) >= maxCount {
- //clear asyncCounter
- for range *aSyncBucket.asyncCounter {
- //delete all elements from asyncCounter
- temp := *aSyncBucket.asyncCounter
- temp = temp[1:]
- aSyncBucket.asyncCounter = &temp
- }
- }
- }
-
- countFromCounterService, err = services.IncrementAndGetCount(q.GetEdgeOrgID(), q.GetID(), weight, period.GetPeriodStartTime().Unix(), period.GetPeriodEndTime().Unix())
- if err != nil {
- return err
- }
- aSyncBucket.asyncGLobalCount = countFromCounterService
-
- aSyncBucket.asyncLocalMessageCount -= weight
- return nil
-}
-
-type NonDistributedQuotaBucketType struct{}
-
-func (sQuotaBucket NonDistributedQuotaBucketType) resetCount(qBucket *QuotaBucket) error {
- //yet to implement
- return errors.New("methog not implemented")
-}
-func (sQuotaBucket NonDistributedQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) {
-
- return nil, errors.New("methog not implemented")
-}
-
-func GetQuotaBucketHandler(qBucket *QuotaBucket) (QuotaBucketType, error) {
-
- if !qBucket.IsDistrubuted() {
- quotaBucketType := &NonDistributedQuotaBucketType{}
- return quotaBucketType, nil
- } else {
- if qBucket.IsSynchronous() {
- quotaBucketType := &SynchronousQuotaBucketType{}
- return quotaBucketType, nil
- }
- quotaBucketType := &AsynchronousQuotaBucketType{}
- return quotaBucketType, nil
-
- }
-
- return nil, errors.New("ignoring: unrecognized quota type")
-
-}
diff --git a/quotaBucket/quotaBucketType_test.go b/quotaBucket/quotaBucketType_test.go
deleted file mode 100644
index d6ac8c0..0000000
--- a/quotaBucket/quotaBucketType_test.go
+++ /dev/null
@@ -1,11 +0,0 @@
-package quotaBucket_test
-
-import (
- . "github.com/onsi/ginkgo"
-)
-
-var _ = Describe("QuotaBucketType", func() {
- It("test QuotaBucketType", func() {
- //fmt.Println("inside QuotaBucketType")
- })
-})
diff --git a/quotaBucket/quotaBucket_test.go b/quotaBucket/quotaBucket_test.go
index cba72c4..4c5788b 100644
--- a/quotaBucket/quotaBucket_test.go
+++ b/quotaBucket/quotaBucket_test.go
@@ -1,6 +1,5 @@
package quotaBucket_test
-
import (
"github.com/30x/apidQuota/constants"
. "github.com/30x/apidQuota/quotaBucket"
@@ -35,10 +34,15 @@
if IsValidTimeUnit("invalidType") {
Fail("Expected false: invalidType is not a valid TimeUnit")
}
+
+ //invalid type
+ if IsValidTimeUnit("") {
+ Fail("Expected false: invalidType is not a valid TimeUnit")
+ }
+
})
})
-
var _ = Describe("Test AcceptedQuotaTypes", func() {
It("testTimeUnit", func() {
if !IsValidType("calendar") {
@@ -50,6 +54,9 @@
if IsValidType("invalidType") {
Fail("Expected false: invalidType is not a valid quotaType")
}
+ if IsValidType("") {
+ Fail("Expected false: invalidType is not a valid quotaType")
+ }
})
})
@@ -58,18 +65,15 @@
//validate all fields set as expected.
//validate period set as expected.
- //validate async QuotaBucket is empty.
+ //validate async QuotaBucket is not empty.
It("Create with NewQuotaBucket with all valid fields", func() {
edgeOrgID := "sampleOrg"
id := "sampleID"
interval := 1
timeUnit := "hour"
quotaType := "calendar"
- preciseAtSecondsLevel := true
maxCount := int64(10)
weight := int64(1)
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
@@ -77,8 +81,8 @@
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
now := time.Now().UTC()
currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
@@ -94,44 +98,11 @@
Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String()))
Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String()))
- //check if isDistributed and isSynchronous are true.
- if !quotaBucket.IsDistrubuted(){
- Fail("expected true, returned true.")
- }
- if !quotaBucket.IsSynchronous(){
- Fail("expected true, returned true.")
- }
-
asyncBucket := quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket != nil{
- Fail("asyncBucket should be nil for synchronous request.")
+ if asyncBucket == nil {
+ Fail("asyncBucket cannot not be nil.")
}
- //testCase2: synchronous := true and (syncTimeInSec and syncMessageCount) > -1 -> aSyncQuotaBucket is nil.
- syncTimeInSec = int64(10)
- syncMessageCount = int64(10)
- quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
- Expect(err).NotTo(HaveOccurred())
- now = time.Now().UTC()
- currentHour = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
- err = quotaBucket.Validate()
- Expect(err).NotTo(HaveOccurred())
- //check if isDistributed and isSynchronous are true.
- if !quotaBucket.IsDistrubuted(){
- Fail("expected true, returned true.")
- }
- if !quotaBucket.IsSynchronous(){
- Fail("expected true, returned true.")
- }
-
- asyncBucket = quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket != nil{
- Fail("asyncBucket should be nil for synchronous request.")
- }
-
-
})
//startTime for quotaBucket after time.Now()
@@ -141,18 +112,15 @@
interval := 1
timeUnit := "hour"
quotaType := "calendar"
- preciseAtSecondsLevel := true
maxCount := int64(10)
weight := int64(1)
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
//start time is after now() -> should still set period.
startTime := time.Now().UTC().AddDate(0, 1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
now := time.Now().UTC()
currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
@@ -167,17 +135,9 @@
Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String()))
Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String()))
- //check if isDistributed and isSynchronous are true.
- if !quotaBucket.IsDistrubuted(){
- Fail("expected true, returned false.")
- }
- if !quotaBucket.IsSynchronous(){
- Fail("expected true, returned false.")
- }
-
- asyncBucket := quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket != nil{
- Fail("asyncBucket should be nil for synchronous request.")
+ currentPeriod := getPeriod.IsCurrentPeriod(quotaBucket);
+ if currentPeriod {
+ Fail("expected currentPeriod to be false")
}
})
@@ -190,11 +150,8 @@
interval := 1
timeUnit := "hour"
quotaType := "calendar"
- preciseAtSecondsLevel := true
maxCount := int64(10)
weight := int64(1)
- distributed := true
- synchronous := false
//Testcase1 : with syncTimeInSec
syncTimeInSec := int64(10)
@@ -202,8 +159,8 @@
//start time is after now() -> should still set period.
startTime := time.Now().UTC().AddDate(0, 1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
now := time.Now().UTC()
currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
@@ -218,16 +175,8 @@
Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String()))
Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String()))
- //check if isDistributed is true and isSynchronous is false.
- if !quotaBucket.IsDistrubuted(){
- Fail("expected true, returned false.")
- }
- if quotaBucket.IsSynchronous(){
- Fail("expected false, returned true.")
- }
-
asyncBucket := quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket == nil{
+ if asyncBucket == nil {
Fail("asyncBucket should be nil for synchronous request.")
}
@@ -237,8 +186,8 @@
//start time is after now() -> should still set period.
startTime = time.Now().UTC().AddDate(0, 1, 0).Unix()
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
now = time.Now().UTC()
currentHour = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
@@ -253,16 +202,8 @@
Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String()))
Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String()))
- //check if isDistributed is true and isSynchronous is false.
- if !quotaBucket.IsDistrubuted(){
- Fail("expected true, returned false.")
- }
- if quotaBucket.IsSynchronous(){
- Fail("expected false, returned true.")
- }
-
asyncBucket = quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket == nil{
+ if asyncBucket == nil {
Fail("asyncBucket should be nil for synchronous request.")
}
@@ -272,8 +213,8 @@
//start time is after now() -> should still set period.
startTime = time.Now().UTC().AddDate(0, 1, 0).Unix()
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).To(HaveOccurred())
})
@@ -284,11 +225,8 @@
interval := 1
timeUnit := "hour"
quotaType := "calendar"
- preciseAtSecondsLevel := true
maxCount := int64(10)
weight := int64(1)
- distributed := false
- synchronous := false
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
@@ -296,14 +234,13 @@
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
now := time.Now().UTC()
currentHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.UTC)
err = quotaBucket.Validate()
Expect(err).NotTo(HaveOccurred())
- Expect(quotaBucket.IsSynchronous()).Should(BeFalse())
//also check if all the fields are set as expected
getPeriod, err := quotaBucket.GetPeriod()
@@ -314,28 +251,14 @@
Expect(getPeriod.GetPeriodStartTime().String()).Should(Equal(currentHour.String()))
Expect(getPeriod.GetPeriodEndTime().String()).Should(Equal(currentHour.Add(time.Hour).String()))
- //check if isDistributed is false and isSynchronous is false.
- if quotaBucket.IsDistrubuted(){
- Fail("expected false, returned true.")
- }
- if quotaBucket.IsSynchronous(){
- Fail("expected false, returned true.")
- }
asyncBucket := quotaBucket.GetAsyncQuotaBucket()
- if asyncBucket != nil {
- Fail("asyncBucket should be nil for synchronous request.")
+ if asyncBucket == nil {
+ Fail("asyncBucket can not be nil.")
}
- synchronous = true
- quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
- Expect(err).To(HaveOccurred())
})
-
-
It("Test invalid timeUnitType", func() {
edgeOrgID := "sampleOrg"
id := "sampleID"
@@ -344,16 +267,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
err = quotaBucket.Validate()
Expect(err).To(HaveOccurred())
@@ -371,16 +291,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
err = quotaBucket.Validate()
Expect(err).To(HaveOccurred())
@@ -400,9 +317,6 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
@@ -410,8 +324,8 @@
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight,syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
Expect(err).NotTo(HaveOccurred())
@@ -427,8 +341,8 @@
//InputStart time is now
startTime = time.Now().UTC().Unix()
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
Expect(err).NotTo(HaveOccurred())
@@ -452,17 +366,14 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
//InputStart time is after now.
startTime := time.Now().UTC().AddDate(0, 1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -485,9 +396,6 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
@@ -495,8 +403,8 @@
startTime := time.Now().UTC().UTC().AddDate(-1, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -513,8 +421,8 @@
//InputStart time is now
startTime = time.Now().UTC().Unix()
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -540,9 +448,6 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
@@ -550,8 +455,8 @@
startTime := time.Now().UTC().AddDate(0, 1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -575,17 +480,14 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
//InputStart time is before now
startTime := time.Now().UTC().AddDate(0, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight,syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -605,8 +507,8 @@
//for non rolling Type window setCurrentPeriod as endTime is < time.now.
quotaType = "calendar"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
qPeriod, err = quotaBucket.GetPeriod()
diff --git a/quotaBucket/quotaCache.go b/quotaBucket/quotaCache.go
index 4afae81..6815f0b 100644
--- a/quotaBucket/quotaCache.go
+++ b/quotaBucket/quotaCache.go
@@ -1,10 +1,10 @@
package quotaBucket
import (
+ "errors"
"github.com/30x/apidQuota/constants"
"sync"
"time"
- "errors"
)
var quotaCachelock = sync.RWMutex{}
@@ -52,17 +52,15 @@
func removeFromCache(cacheKey string, qBucketCache quotaBucketCache) error {
//for async Stop the scheduler.
- if qBucketCache.qBucket.Distributed && !qBucketCache.qBucket.IsSynchronous() {
- aSyncBucket := qBucketCache.qBucket.GetAsyncQuotaBucket()
- if aSyncBucket == nil {
- return errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.")
- }
- qticker, err := aSyncBucket.getAsyncQTicker()
- if err != nil {
- return err
- }
- qticker.Stop()
+ aSyncBucket := qBucketCache.qBucket.GetAsyncQuotaBucket()
+ if aSyncBucket == nil {
+ return errors.New(constants.AsyncQuotaBucketEmpty + " : aSyncQuotaBucket to increment cannot be empty.")
}
+ qticker, err := aSyncBucket.getAsyncQTicker()
+ if err != nil {
+ return err
+ }
+ qticker.Stop()
quotaCachelock.Lock()
delete(quotaCache, cacheKey)
diff --git a/quotaBucket/quotaDescriptorType_test.go b/quotaBucket/quotaDescriptorType_test.go
index 26316f4..9f4ec8e 100644
--- a/quotaBucket/quotaDescriptorType_test.go
+++ b/quotaBucket/quotaDescriptorType_test.go
@@ -47,16 +47,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix()
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -78,8 +75,8 @@
timeUnit = "minute"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -101,8 +98,8 @@
timeUnit = "hour"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -124,8 +121,8 @@
timeUnit = "day"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -147,8 +144,8 @@
timeUnit = "week"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -170,8 +167,8 @@
timeUnit = "month"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -203,16 +200,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -234,8 +228,8 @@
timeUnit = "invalidTimeUnit"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
err = quotaBucket.Validate()
Expect(err).To(HaveOccurred())
if ok := strings.Contains(err.Error(), constants.InvalidQuotaTimeUnitType); !ok {
@@ -254,16 +248,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -285,8 +276,8 @@
timeUnit = "minute"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -308,8 +299,8 @@
timeUnit = "hour"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -331,8 +322,8 @@
timeUnit = "day"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -354,8 +345,8 @@
timeUnit = "week"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -377,8 +368,8 @@
timeUnit = "month"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -410,16 +401,13 @@
interval := 1
maxCount := int64(10)
weight := int64(1)
- preciseAtSecondsLevel := true
startTime := time.Now().UTC().UTC().AddDate(0, -1, 0).Unix()
- distributed := true
- synchronous := true
syncTimeInSec := int64(-1)
syncMessageCount := int64(-1)
quotaBucket, err := NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
Expect(err).NotTo(HaveOccurred())
err = quotaBucket.Validate()
@@ -441,8 +429,8 @@
timeUnit = "invalidTimeUnit"
quotaBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit,
- quotaType, preciseAtSecondsLevel, startTime, maxCount,
- weight, distributed, synchronous, syncTimeInSec, syncMessageCount)
+ quotaType, startTime, maxCount,
+ weight, syncTimeInSec, syncMessageCount)
err = quotaBucket.Validate()
Expect(err).To(HaveOccurred())
if ok := strings.Contains(err.Error(), constants.InvalidQuotaTimeUnitType); !ok {
diff --git a/services/counterServiceHelper.go b/services/counterServiceHelper.go
index 5a21ce8..186afb6 100644
--- a/services/counterServiceHelper.go
+++ b/services/counterServiceHelper.go
@@ -10,6 +10,8 @@
"net/http"
"net/url"
"time"
+ "fmt"
+ "os"
)
const (
@@ -75,15 +77,23 @@
}
addApigeeSyncTokenToHeader(request)
+ fmt.Println("request: ", request)
resp, err := client.Do(request)
if err != nil {
return 0, errors.New("error calling CounterService: " + err.Error())
}
+ defer resp.Body.Close()
+
globalVariables.Log.Debug("response: ", resp)
if resp.StatusCode != http.StatusOK {
+ fmt.Println("resp: ", resp)
+
respBodyBytes, err := ioutil.ReadAll(resp.Body)
+ fmt.Println(os.Stdout, string(respBodyBytes)) //<-- here !
+
+
if resp.StatusCode == http.StatusNotFound {
return 0, errors.New("response from counter service: " + resp.Status + " and response body is: " + string(respBodyBytes))
}