intial commit for quotaCache and scheduler
diff --git a/api.go b/api.go
index b71bc82..75e87fa 100644
--- a/api.go
+++ b/api.go
@@ -44,7 +44,6 @@
return
}
- fmt.Println("test1")
results, err := qBucket.IncrementQuotaLimit()
if err != nil {
diff --git a/constants/constants.go b/constants/constants.go
index 459efb2..1d5449c 100644
--- a/constants/constants.go
+++ b/constants/constants.go
@@ -1,5 +1,7 @@
package constants
+import "time"
+
const (
//add to acceptedTimeUnitList in init() if case any other new timeUnit is added
@@ -19,7 +21,8 @@
QuotaTypeCalendar = "calendar" // after start time
QuotaTypeRollingWindow = "rollingwindow" // in the past "window" time
- cacheKeyDelimiter = "|"
+ CacheKeyDelimiter = "|"
+ CacheTTL = time.Minute * 1
UnableToParseBody = "unable_to_parse_body"
UnMarshalJSONError = "unmarshal_json_error"
ErrorConvertReqBodyToEntity = "error_convert_reqBody_to_entity"
diff --git a/globalVariables/globalVariables.go b/globalVariables/globalVariables.go
index 50a4df0..2891ca9 100644
--- a/globalVariables/globalVariables.go
+++ b/globalVariables/globalVariables.go
@@ -1,6 +1,8 @@
package globalVariables
-import "github.com/30x/apid-core"
+import (
+ "github.com/30x/apid-core"
+)
var (
Log apid.LogService
diff --git a/quotaBucket/apiUtil.go b/quotaBucket/apiUtil.go
index 27d964d..707ab62 100644
--- a/quotaBucket/apiUtil.go
+++ b/quotaBucket/apiUtil.go
@@ -4,6 +4,8 @@
"errors"
"reflect"
"time"
+ "github.com/30x/apidQuota/constants"
+ "fmt"
)
const (
@@ -22,7 +24,9 @@
expiresAt int64
}
-func (qBucket *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error {
+func (qBucketRequest *QuotaBucket) FromAPIRequest(quotaBucketMap map[string]interface{}) error {
+ fmt.Println("qBucketRequest: ", qBucketRequest.quotaBucketData )
+ var cacheKey string
var edgeOrgID, id, timeUnit, quotaType string
var interval int
var startTime, maxCount, weight int64
@@ -48,6 +52,7 @@
}
id = value.(string)
+ cacheKey = edgeOrgID + constants.CacheKeyDelimiter + id
value, ok = quotaBucketMap["interval"]
if !ok {
return errors.New(`missing field: 'interval' is required`)
@@ -159,18 +164,31 @@
if syncTimeType := reflect.TypeOf(syncTimeValue); syncTimeType.Kind() != reflect.Float64 {
return errors.New(`invalid type : 'syncTimeInSec' should be a number`)
}
- syncTimeFloat := value.(float64)
+ syncTimeFloat := syncTimeValue.(float64)
syncTimeInt := int64(syncTimeFloat)
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
- qBucket.quotaBucketData = newQBucket.quotaBucketData
- if err := qBucket.Validate(); err != nil {
- return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ //try to retrieve from cache
+ newQBucket, ok = getFromCache(cacheKey)
+
+ if !ok {
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
+ startTime, maxCount, weight, distributed, synchronous, syncTimeInt, -1)
+ if err != nil {
+ return errors.New("error creating quotaBucket: " + err.Error())
+ }
+ fmt.Println("qbucket: ", qBucketRequest)
+ fmt.Println("newqbucket: ", newQBucket)
+
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ if err := qBucketRequest.Validate(); err != nil {
+ return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ }
+
+ addToCache(qBucketRequest)
+ return nil
}
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
return nil
@@ -180,48 +198,75 @@
}
syncMsgCountFloat := value.(float64)
syncMsgCountInt := int64(syncMsgCountFloat)
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
- qBucket.quotaBucketData = newQBucket.quotaBucketData
+ //try to retrieve from cache
+ newQBucket, ok = getFromCache(cacheKey)
- if err := qBucket.Validate(); err != nil {
- return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ if !ok {
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
+ startTime, maxCount, weight, distributed, synchronous, -1, syncMsgCountInt)
+ if err != nil {
+ return errors.New("error creating quotaBucket: " + err.Error())
+ }
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ if err := qBucketRequest.Validate(); err != nil {
+ return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ }
+
+ addToCache(qBucketRequest)
+ return nil
+
}
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
return nil
}
}
- //for synchronous quotaBucket
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, synchronous, -1, -1)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
- qBucket.quotaBucketData = newQBucket.quotaBucketData
+ //try to retrieve from cache
+ newQBucket, ok = getFromCache(cacheKey)
- if err := qBucket.Validate(); err != nil {
- return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ if !ok {
+ //for synchronous quotaBucket
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
+ startTime, maxCount, weight, distributed, synchronous, -1, -1)
+ if err != nil {
+ return errors.New("error creating quotaBucket: " + err.Error())
+ }
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ if err := qBucketRequest.Validate(); err != nil {
+ return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ }
+ addToCache(qBucketRequest)
+ return nil
}
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
return nil
}
- //for non distributed quotaBucket
- newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
- startTime, maxCount, weight, distributed, false, -1, -1)
- if err != nil {
- return errors.New("error creating quotaBucket: " + err.Error())
- }
+ //retrieveFromCache.
+ newQBucket, ok = getFromCache(cacheKey)
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
- qBucket.quotaBucketData = newQBucket.quotaBucketData
+ if !ok {
+ //for non distributed quotaBucket
+ newQBucket, err = NewQuotaBucket(edgeOrgID, id, interval, timeUnit, quotaType, preciseAtSecondsLevel,
+ startTime, maxCount, weight, distributed, false, -1, -1)
+ if err != nil {
+ return errors.New("error creating quotaBucket: " + err.Error())
- if err := qBucket.Validate(); err != nil {
- return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ }
+
+ qBucketRequest.quotaBucketData = newQBucket.quotaBucketData
+
+ if err := qBucketRequest.Validate(); err != nil {
+ return errors.New("failed in Validating the quotaBucket: " + err.Error())
+ }
+
+ addToCache(qBucketRequest)
}
return nil
diff --git a/quotaBucket/quotaBucket.go b/quotaBucket/quotaBucket.go
index ca22202..5b81b1d 100644
--- a/quotaBucket/quotaBucket.go
+++ b/quotaBucket/quotaBucket.go
@@ -5,6 +5,7 @@
"github.com/30x/apidQuota/constants"
"strings"
"time"
+ "fmt"
)
var (
@@ -86,6 +87,8 @@
Synchronous bool
SyncTimeInSec int64
SyncMessageCount int64
+ QTicker *time.Ticker
+
}
type QuotaBucket struct {
@@ -113,6 +116,7 @@
Synchronous: synchronous,
SyncTimeInSec: syncTimeInSec,
SyncMessageCount: syncMessageCount,
+ QTicker: &time.Ticker{},
}
quotaBucket := &QuotaBucket{
@@ -123,6 +127,24 @@
if err != nil {
return nil, err
}
+
+ //for async start the scheduler
+ if distributed && !synchronous{
+ quotaBucket.quotaBucketData.QTicker = time.NewTicker(time.Second)
+ go func() {
+ count := 0
+ fmt.Println("inside create ticker")
+ for t := range quotaBucket.quotaBucketData.QTicker.C {
+ fmt.Println("Ticker ticked ", t)
+ if count > 10 {
+ quotaBucket.getTicker().Stop()
+ }
+ count += 1
+ }
+ }()
+ }
+
+ fmt.Println("quotaBucketdata: " ,quotaBucket.quotaBucketData)
return quotaBucket, nil
}
@@ -194,6 +216,10 @@
return q.quotaBucketData.Synchronous
}
+
+func (q *QuotaBucket) getTicker() *time.Ticker {
+ return q.quotaBucketData.QTicker
+}
//Calls setCurrentPeriod if DescriptorType is 'rollingWindow' or period.endTime is before now().
// It is required to setPeriod while incrementing the count.
func (q *QuotaBucket) GetPeriod() (*QuotaPeriod, error) {
diff --git a/quotaBucket/quotaBucketType.go b/quotaBucket/quotaBucketType.go
index b905bd7..b6ea239 100644
--- a/quotaBucket/quotaBucketType.go
+++ b/quotaBucket/quotaBucketType.go
@@ -4,6 +4,7 @@
"errors"
"fmt"
"github.com/30x/apidQuota/services"
+ "time"
)
type QuotaBucketType interface {
@@ -104,16 +105,24 @@
syncTimeInSec int64
}
-func (quotaBucketType AsynchronousQuotaBucketType) resetCount(qBucket *QuotaBucket) error {
+func (quotaBucketType AsynchronousQuotaBucketType) resetCount(q *QuotaBucket) error {
//yet to implement
return nil
}
-func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(qBucket *QuotaBucket) (*QuotaBucketResults, error) {
+func (quotaBucketType AsynchronousQuotaBucketType) incrementQuotaCount(q *QuotaBucket) (*QuotaBucketResults, error) {
//getCount()
fmt.Println("increment count for async")
-
- return nil, nil
+ results := &QuotaBucketResults{
+ EdgeOrgID: q.GetEdgeOrgID(),
+ ID: q.GetID(),
+ exceededTokens: true,
+ currentTokens: 51,
+ MaxCount: 50,
+ startedAt: time.Now().Unix(),
+ expiresAt: time.Now().Unix(),
+ }
+ return results, nil
}
func (quotaBucketType AsynchronousQuotaBucketType) resetQuotaForCurrentPeriod(q *QuotaBucket) (*QuotaBucketResults, error) {
diff --git a/quotaBucket/quotaCache.go b/quotaBucket/quotaCache.go
new file mode 100644
index 0000000..e0cfe11
--- /dev/null
+++ b/quotaBucket/quotaCache.go
@@ -0,0 +1,70 @@
+package quotaBucket
+
+import (
+ "github.com/30x/apidQuota/constants"
+ "time"
+ "sync"
+ "fmt"
+)
+
+var quotaCachelock = sync.RWMutex{}
+
+type quotaBucketCache struct {
+ qBucket *QuotaBucket
+ expiryTime int64
+}
+
+var quotaCache map[string]quotaBucketCache
+
+func init() {
+ quotaCache = make(map[string]quotaBucketCache)
+}
+
+func getFromCache(cacheKey string) (*QuotaBucket,bool) {
+ quotaCachelock.Lock()
+ qBucketCache, ok := quotaCache[cacheKey]
+ quotaCachelock.Unlock()
+ if !ok {
+ fmt.Println("not in cache. add to cache.")
+ return nil,false
+ }
+
+ isExpired := time.Unix(qBucketCache.expiryTime, 0).Before(time.Now().UTC())
+ if isExpired {
+ fmt.Println("quotaBucket expired: remove from cache and return false.")
+ removeFromCache(cacheKey, qBucketCache)
+ return nil, false
+ }
+
+ return qBucketCache.qBucket, true
+
+}
+
+func removeFromCache(cacheKey string, qBucketCache quotaBucketCache) {
+ fmt.Println("inside remove from cache")
+ //for async Stop the scheduler.
+ if qBucketCache.qBucket.Distributed && !qBucketCache.qBucket.IsSynchronous(){
+ qBucketCache.qBucket.getTicker().Stop()
+ }
+
+ quotaCachelock.Lock()
+ delete(quotaCache,cacheKey)
+ quotaCachelock.Unlock()
+}
+
+func addToCache(qBucketToAdd *QuotaBucket) {
+ cacheKey := qBucketToAdd.GetEdgeOrgID() + constants.CacheKeyDelimiter + qBucketToAdd.GetID()
+ ttl := time.Now().UTC().Add(constants.CacheTTL).Unix()
+
+ qCacheData := quotaBucketCache{
+ qBucket: qBucketToAdd,
+ expiryTime: ttl,
+ }
+
+ fmt.Println("qbucket in cache: ", qBucketToAdd.getTicker())
+ quotaCachelock.Lock()
+ quotaCache[cacheKey] = qCacheData
+ quotaCachelock.Unlock()
+ fmt.Println("duration: " ,time.Unix(qCacheData.expiryTime,0).String())
+ fmt.Println("now: ", time.Now().UTC())
+}