[ISSUE-66918282] long poll for /configurations
diff --git a/api.go b/api.go
index f676aaa..10a11d5 100644
--- a/api.go
+++ b/api.go
@@ -16,14 +16,12 @@
import (
"bytes"
"encoding/json"
- "fmt"
"github.com/gorilla/mux"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
- "sync/atomic"
"time"
)
@@ -64,7 +62,7 @@
)
type deploymentsResult struct {
- deployments []DataDeployment
+ deployments []Configuration
err error
eTag string
}
@@ -96,36 +94,33 @@
//TODO add support for block and subscriber
type apiManagerInterface interface {
+ InitDistributeEvents()
InitAPI()
- //addChangedDeployment(string)
- //distributeEvents()
}
type apiManager struct {
dbMan dbManagerInterface
deploymentsEndpoint string
blobEndpoint string
- eTag int64
- deploymentsChanged chan interface{}
- addSubscriber chan chan deploymentsResult
- removeSubscriber chan chan deploymentsResult
+ addSubscriber chan chan interface{}
apiInitialized bool
+ configEtag *ConfigurationsEtagCache
+}
+
+func (a *apiManager) InitDistributeEvents(){
+ go distributeEvents(a.configEtag.getChangeChannel(), a.addSubscriber)
}
func (a *apiManager) InitAPI() {
if a.apiInitialized {
return
}
- services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
+ services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetConfigurations).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
a.apiInitialized = true
log.Debug("API endpoints initialized")
}
-func (a *apiManager) addChangedDeployment(id string) {
- a.deploymentsChanged <- id
-}
-
func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) {
w.WriteHeader(status)
e := errorResponse{
@@ -145,70 +140,11 @@
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
}
-func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
- send := func(toSend []interface{}) {
- if toSend != nil {
- log.Debugf("debouncer sending: %v", toSend)
- out <- toSend
- }
- }
- var toSend []interface{}
- for {
- select {
- case incoming, ok := <-in:
- if ok {
- log.Debugf("debouncing %v", incoming)
- toSend = append(toSend, incoming)
- } else {
- send(toSend)
- log.Debugf("closing debouncer")
- close(out)
- return
- }
- case <-time.After(window):
- send(toSend)
- toSend = nil
- }
- }
-}
-//TODO get notified when deployments ready
-/*
-func (a *apiManager) distributeEvents() {
- subscribers := make(map[chan deploymentsResult]bool)
- deliverDeployments := make(chan []interface{}, 1)
- go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration)
- for {
- select {
- case _, ok := <-deliverDeployments:
- if !ok {
- return // todo: using this?
- }
- subs := subscribers
- subscribers = make(map[chan deploymentsResult]bool)
- go func() {
- eTag := a.incrementETag()
- deployments, err := a.dbMan.getUnreadyDeployments()
- log.Debugf("delivering deployments to %d subscribers", len(subs))
- for subscriber := range subs {
- log.Debugf("delivering to: %v", subscriber)
- subscriber <- deploymentsResult{deployments, err, eTag}
- }
- }()
- case subscriber := <-a.addSubscriber:
- log.Debugf("Add subscriber: %v", subscriber)
- subscribers[subscriber] = true
- case subscriber := <-a.removeSubscriber:
- log.Debugf("Remove subscriber: %v", subscriber)
- delete(subscribers, subscriber)
- }
- }
-}
-*/
-// TODO use If-None-Match and ETag
+
func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
@@ -231,7 +167,7 @@
}
-func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
+func (a *apiManager) apiGetConfigurations(w http.ResponseWriter, r *http.Request) {
// If returning without a bundle (immediately or after timeout), status = 404
// If returning If-None-Match value is equal to current deployment, status = 304
@@ -244,7 +180,7 @@
if b != "" {
var err error
timeout, err = strconv.Atoi(b)
- if err != nil {
+ if err != nil || timeout<0{
a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds")
return
}
@@ -254,62 +190,45 @@
// If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
// query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
// most recent bundle list.
- ifNoneMatch := r.Header.Get("If-None-Match")
- log.Debugf("if-none-match: %s", ifNoneMatch)
+ requestETag := r.Header.Get("Etag")
+ log.Debugf("Etag: %s", requestETag)
// send unmodified if matches prior eTag and no timeout
eTag := a.getETag()
- if eTag == ifNoneMatch && timeout == 0 {
- w.WriteHeader(http.StatusNotModified)
- return
- }
-
- // send results if different eTag
- if eTag != ifNoneMatch {
+ if requestETag=="" || eTag != requestETag { // send results if different eTag
a.sendReadyDeployments(w)
return
}
- // otherwise, subscribe to any new deployment changes
- var newDeploymentsChannel chan deploymentsResult
- if timeout > 0 && ifNoneMatch != "" {
- //TODO handle block
- //newDeploymentsChannel = make(chan deploymentsResult, 1)
- //a.addSubscriber <- newDeploymentsChannel
+ if timeout == 0 { // non-blocking
+ w.WriteHeader(http.StatusNotModified)
+ return
}
+ // long poll
+
+ // subscribe to any new deployment changes
+ ConfigChangeChan := make(chan interface{}, 1)
+ a.addSubscriber <- ConfigChangeChan
+
log.Debug("Blocking request... Waiting for new Deployments.")
select {
- case result := <-newDeploymentsChannel:
- if result.err != nil {
- a.writeInternalError(w, "Database error")
- } else {
- a.sendDeployments(w, result.deployments, result.eTag)
- }
-
+ case <-ConfigChangeChan:
+ // send configs and etag
+ a.sendReadyDeployments(w)
case <-time.After(time.Duration(timeout) * time.Second):
- a.removeSubscriber <- newDeploymentsChannel
- log.Debug("Blocking deployment request timed out.")
- if ifNoneMatch != "" {
- w.WriteHeader(http.StatusNotModified)
- } else {
- a.sendReadyDeployments(w)
- }
+ log.Debug("Blocking configuration request timed out.")
+ w.WriteHeader(http.StatusNotModified)
}
}
func (a *apiManager) sendReadyDeployments(w http.ResponseWriter) {
- eTag := a.getETag()
- deployments, err := a.dbMan.getReadyDeployments()
- if err != nil {
- a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error()))
- return
- }
- a.sendDeployments(w, deployments, eTag)
+ eTagConfig := a.configEtag.GetConfigsAndETag()
+ a.sendDeployments(w, eTagConfig.Configs, eTagConfig.ETag)
}
-func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) {
+func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []Configuration, eTag string) {
apiDeps := ApiDeploymentResponse{}
apiDepDetails := make([]ApiDeploymentDetails, 0)
@@ -346,15 +265,12 @@
w.Write(b)
}
-// call whenever the list of deployments changes
-func (a *apiManager) incrementETag() string {
- e := atomic.AddInt64(&a.eTag, 1)
- return strconv.FormatInt(e, 10)
-}
func (a *apiManager) getETag() string {
- e := atomic.LoadInt64(&a.eTag)
- return strconv.FormatInt(e, 10)
+ if a.configEtag==nil {
+ return ""
+ }
+ return a.configEtag.GetETag()
}
// escape the blobId into url
diff --git a/api_test.go b/api_test.go
index ccd3b0c..a2bc4a4 100644
--- a/api_test.go
+++ b/api_test.go
@@ -46,10 +46,7 @@
dbMan: dummyDbMan,
deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount),
blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}",
- eTag: int64(testCount * 10),
- deploymentsChanged: make(chan interface{}, 5),
- addSubscriber: make(chan chan deploymentsResult),
- removeSubscriber: make(chan chan deploymentsResult),
+ addSubscriber: make(chan chan interface{}),
}
testApiMan.InitAPI()
time.Sleep(100 * time.Millisecond)
@@ -197,7 +194,7 @@
dep := makeTestDeployment()
dep.Created = t
dep.Updated = t
- dummyDbMan.readyDeployments = []DataDeployment{*dep}
+ dummyDbMan.readyDeployments = []Configuration{*dep}
detail := makeExpectedDetail(dep, uri.String())
detail.Created = isoTime[i]
detail.Updated = isoTime[i]
@@ -218,29 +215,6 @@
}
})
- It("should debounce requests", func(done Done) {
- var in = make(chan interface{})
- var out = make(chan []interface{})
-
- go testApiMan.debounce(in, out, 3*time.Millisecond)
-
- go func() {
- defer GinkgoRecover()
-
- received, ok := <-out
- Expect(ok).To(BeTrue())
- Expect(len(received)).To(Equal(2))
-
- close(in)
- received, ok = <-out
- Expect(ok).To(BeFalse())
-
- close(done)
- }()
-
- in <- "x"
- in <- "y"
- })
})
@@ -280,7 +254,7 @@
mathrand.Seed(time.Now().UnixNano())
count := mathrand.Intn(5) + 1
- deployments := make([]DataDeployment, count)
+ deployments := make([]Configuration, count)
details := make([]ApiDeploymentDetails, count)
for i := 0; i < count; i++ {
@@ -296,8 +270,8 @@
return details
}
-func makeTestDeployment() *DataDeployment {
- dep := &DataDeployment{
+func makeTestDeployment() *Configuration {
+ dep := &Configuration{
ID: GenerateUUID(),
OrgID: GenerateUUID(),
EnvID: GenerateUUID(),
@@ -315,7 +289,7 @@
return dep
}
-func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails {
+func makeExpectedDetail(dep *Configuration, self string) *ApiDeploymentDetails {
detail := &ApiDeploymentDetails{
Self: self + "/" + dep.ID,
Name: dep.Name,
@@ -334,7 +308,8 @@
type dummyDbManager struct {
unreadyBlobIds []string
- readyDeployments []DataDeployment
+ unreadyConfigs []*pendingConfiguration
+ readyDeployments []Configuration
localFSLocation string
fileResponse chan string
version string
@@ -344,6 +319,10 @@
d.version = version
}
+func (d *dummyDbManager) getUnreadyConfigs() ([]*pendingConfiguration, error) {
+ return d.unreadyConfigs, nil
+}
+
func (d *dummyDbManager) initDb() error {
return nil
}
@@ -352,7 +331,7 @@
return d.unreadyBlobIds, nil
}
-func (d *dummyDbManager) getReadyDeployments() ([]DataDeployment, error) {
+func (d *dummyDbManager) getReadyDeployments() ([]Configuration, error) {
return d.readyDeployments, nil
}
diff --git a/bundle.go b/bundle.go
index b4729cf..88c61e0 100644
--- a/bundle.go
+++ b/bundle.go
@@ -34,18 +34,16 @@
type bundleManagerInterface interface {
initializeBundleDownloading()
- queueDownloadRequest(*DataDeployment)
+ queueDownloadRequest(*pendingConfiguration)
enqueueRequest(*DownloadRequest)
- makeDownloadRequest(string) *DownloadRequest
- deleteBundlesFromDeployments([]DataDeployment)
- deleteBundleById(string)
+ makeDownloadRequest(id string, counter *int32) *DownloadRequest
+ deleteBlobs([]string)
Close()
}
type bundleManager struct {
blobServerUrl string
dbMan dbManagerInterface
- apiMan apiManagerInterface
concurrentDownloads int
markDeploymentFailedAfter time.Duration
bundleRetryDelay time.Duration
@@ -54,6 +52,7 @@
isClosed *int32
workers []*BundleDownloader
client *http.Client
+ configEtag *ConfigurationsEtagCache
}
type blobServerResponse struct {
@@ -64,6 +63,11 @@
SignedUrlExpiryTimestamp string `json:"signedurlexpirytimestamp"`
}
+type pendingConfiguration struct {
+ dataDeployment *Configuration
+ counter int32
+}
+
func (bm *bundleManager) initializeBundleDownloading() {
atomic.StoreInt32(bm.isClosed, 0)
bm.workers = make([]*BundleDownloader, bm.concurrentDownloads)
@@ -82,17 +86,30 @@
// download bundle blob and resource blob
// TODO do not download duplicate blobs
-func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) {
- blobReq := bm.makeDownloadRequest(dep.BlobID)
- resourceReq := bm.makeDownloadRequest(dep.BlobResourceID)
-
- go func() {
- bm.enqueueRequest(blobReq)
- bm.enqueueRequest(resourceReq)
- }()
+func (bm *bundleManager) queueDownloadRequest(conf *pendingConfiguration) {
+ log.Debugf("enque pendingConfiguration: %s", conf.dataDeployment.ID)
+ conf.counter = 0
+ if conf.dataDeployment.BlobID != "" {
+ conf.counter++
+ }
+ if conf.dataDeployment.BlobResourceID != "" {
+ conf.counter++
+ }
+ if conf.dataDeployment.BlobID != "" {
+ blobReq := bm.makeDownloadRequest(conf.dataDeployment.BlobID, &conf.counter)
+ go bm.enqueueRequest(blobReq)
+ }
+ if conf.dataDeployment.BlobResourceID != "" {
+ resourceReq := bm.makeDownloadRequest(conf.dataDeployment.BlobResourceID, &conf.counter)
+ go bm.enqueueRequest(resourceReq)
+ }
}
-func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest {
+
+func (bm *bundleManager) makeDownloadRequest(id string, counter *int32) *DownloadRequest {
+ if id=="" {
+ return nil
+ }
markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
retryIn := bm.bundleRetryDelay
maxBackOff := 5 * time.Minute
@@ -104,6 +121,7 @@
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
client: bm.client,
+ counter: counter,
}
}
@@ -112,13 +130,6 @@
if atomic.LoadInt32(bm.isClosed) == 1 {
return
}
- /*
- defer func() {
- if r := recover(); r != nil {
- log.Warn("trying to enque requests to closed bundleManager")
- }
- }()
- */
bm.downloadQueue <- r
}
@@ -127,31 +138,13 @@
close(bm.downloadQueue)
}
-func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) {
- for _, dep := range deletedDeployments {
- go bm.deleteBundleById(dep.BlobID)
- go bm.deleteBundleById(dep.BlobResourceID)
- }
+func (bm *bundleManager) deleteBlobs (ids []string) {
+ // TODO Delete from the Database table apid_blob_available, reference counting
- /*
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
- go func() {
- // give clients a minute to avoid conflicts
- time.Sleep(bm.bundleCleanupDelay)
- for _, dep := range deletedDeployments {
- bundleFile := getBlobFilePath(dep.BlobID)
- log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table apid_blob_available
- safeDelete(bundleFile)
- }
- }()
- */
+ // TODO Delete from local file system
}
-// TODO add delete support
-func (bm *bundleManager) deleteBundleById(blobId string) {
-}
type DownloadRequest struct {
bm *bundleManager
@@ -160,6 +153,8 @@
markFailedAt time.Time
blobServerURL string
client *http.Client
+ counter *int32
+ dep *Configuration
}
func (r *DownloadRequest) downloadBundle() error {
@@ -188,7 +183,9 @@
return err
}
- log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile)
+ // succeeded
+
+ log.Debugf("blob downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile)
err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile)
if err != nil {
@@ -199,10 +196,12 @@
return err
}
- log.Debugf("bundle downloaded: blobId=%s filename=%s", r.blobId, downloadedFile)
+ log.Debugf("blob inserted: blobId=%s filename=%s", r.blobId, downloadedFile)
- // TODO send changed deployments to subscribers (API call with "block")
- //r.bm.apiMan.addChangedDeployment(dep.ID)
+ // if all required blobs have been downloaded
+ if atomic.AddInt32(r.counter, -1) == 0 && r.bm.configEtag!=nil {
+ go r.bm.configEtag.Insert(r.dep)
+ }
return nil
}
diff --git a/bundle_test.go b/bundle_test.go
index 7946f96..5a373ac 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -69,7 +69,6 @@
testBundleMan = &bundleManager{
blobServerUrl: bundleTestUrl,
dbMan: dummyDbMan,
- apiMan: dummyApiMan,
concurrentDownloads: concurrentDownloads,
markDeploymentFailedAfter: 5 * time.Second,
bundleRetryDelay: time.Second,
diff --git a/configEtag.go b/configEtag.go
new file mode 100644
index 0000000..54860f2
--- /dev/null
+++ b/configEtag.go
@@ -0,0 +1,141 @@
+package apiGatewayConfDeploy
+
+import (
+ "crypto/md5"
+ "sort"
+ "strings"
+ "sync"
+)
+
+
+type ETagAndConfig struct {
+ ETag string
+ Configs []Configuration
+}
+
+func createConfigurationsEtag() *ConfigurationsEtagCache {
+ return &ConfigurationsEtagCache{
+ List: &SortedList{},
+ mutex: &sync.RWMutex{},
+ configChange: make(chan interface{}, 10),
+ }
+}
+
+
+
+type ConfigurationsEtagCache struct {
+ List *SortedList
+ etag string
+ mutex *sync.RWMutex
+ configChange chan interface{}
+}
+
+
+
+func (c *ConfigurationsEtagCache) Construct(deps []Configuration) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.List.Construct(deps)
+ c.etag = string(c.List.ComputeHash())
+ go c.notifyChange()
+}
+
+func (c *ConfigurationsEtagCache) Insert(dep *Configuration) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ c.List.Insert(dep)
+ c.etag = string(c.List.ComputeHash())
+ go c.notifyChange()
+}
+
+// delete the ids from cache, and notify change
+func (c *ConfigurationsEtagCache) DeleteBunch(ids []string) {
+ c.mutex.Lock()
+ defer c.mutex.Unlock()
+ for _, id := range ids {
+ c.List.Delete(id)
+ }
+ c.etag = string(c.List.ComputeHash())
+ go c.notifyChange()
+}
+
+func (c *ConfigurationsEtagCache) GetETag() string {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ return c.etag
+}
+
+func (c *ConfigurationsEtagCache) GetConfigsAndETag() *ETagAndConfig {
+ c.mutex.RLock()
+ defer c.mutex.RUnlock()
+ return &ETagAndConfig{
+ ETag: c.etag,
+ Configs: c.List.GetConfigs(),
+ }
+}
+
+func (c *ConfigurationsEtagCache) notifyChange() {
+ c.configChange <- true
+}
+
+func (c *ConfigurationsEtagCache) getChangeChannel() <- chan interface{} {
+ return c.configChange
+}
+
+type SortedList struct {
+ list []*Configuration
+}
+
+func (s *SortedList) less(i int, j int) bool {
+ return strings.Compare(s.list[i].ID, s.list[j].ID) < 0
+}
+
+func (s *SortedList) Insert(dep *Configuration) {
+ i := s.indexOf(dep.ID)
+ s.list = append(s.list, nil)
+ copy(s.list[i+1:], s.list[i:])
+ s.list[i] = dep
+}
+
+func (s *SortedList) ComputeHash() [16]byte {
+ ids := make([]string, len(s.list))
+ for i, dep := range s.list {
+ ids[i] = dep.ID
+ }
+ return md5.Sum([]byte(strings.Join(ids, "")))
+}
+
+func (s *SortedList) Delete(id string) {
+ i := s.indexOf(id)
+ if s.list[i].ID == id {
+ s.list = append(s.list[:i], s.list[i+1:]...)
+ }
+}
+
+func (s *SortedList) indexOf(id string) int {
+ var i int
+ for i = 0; strings.Compare(s.list[i].ID, id) == -1; i++ {
+ }
+ return i
+}
+
+func (s *SortedList) Construct(l []Configuration) {
+
+ s.list = make([]*Configuration, len(l))
+ for i, dep := range l {
+ s.list[i] = &dep
+ }
+ sort.Slice(s.list, s.less)
+}
+
+func (s *SortedList) Len() int {
+ return len(s.list)
+}
+
+func (s *SortedList) GetConfigs() []Configuration {
+ deps := make([]Configuration, len(s.list))
+ for i, dep := range s.list {
+ deps[i] = *dep
+ }
+ return deps
+}
\ No newline at end of file
diff --git a/data.go b/data.go
index 564ce6d..abab743 100644
--- a/data.go
+++ b/data.go
@@ -25,7 +25,7 @@
gwBlobId int64
)
-type DataDeployment struct {
+type Configuration struct {
ID string
OrgID string
EnvID string
@@ -49,15 +49,16 @@
setDbVersion(string)
initDb() error
getUnreadyBlobs() ([]string, error)
- getReadyDeployments() ([]DataDeployment, error)
+ getUnreadyConfigs() ([]*pendingConfiguration, error)
+ getReadyDeployments() ([]Configuration, error)
updateLocalFsLocation(string, string) error
getLocalFSLocation(string) (string, error)
}
type dbManager struct {
- data apid.DataService
- db apid.DB
- dbMux sync.RWMutex
+ data apid.DataService
+ db apid.DB
+ dbMux *sync.RWMutex
}
func (dbc *dbManager) setDbVersion(version string) {
@@ -135,7 +136,55 @@
return
}
-func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) {
+func (dbc *dbManager) getUnreadyConfigs() ([]*pendingConfiguration, error) {
+
+ rows, err := dbc.getDb().Query(`
+ SELECT a.id,
+ a.organization_id,
+ a.environment_id,
+ a.bean_blob_id,
+ a.resource_blob_id,
+ a.type,
+ a.name,
+ a.revision,
+ a.path,
+ a.created_at,
+ a.created_by,
+ a.updated_at,
+ a.updated_by
+ FROM metadata_runtime_entity_metadata as a
+ WHERE a.id IN
+ (
+ SELECT a.id as id
+ FROM metadata_runtime_entity_metadata as a
+ WHERE a.bean_blob_id NOT IN
+ (SELECT b.id FROM apid_blob_available as b)
+ UNION
+ SELECT a.id as id
+ FROM metadata_runtime_entity_metadata as a
+ WHERE a.resource_blob_id NOT IN
+ (SELECT b.id FROM apid_blob_available as b)
+ )
+ WHERE id IS NOT NULL AND id != ''
+ ;
+ `)
+ if err != nil {
+ log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
+ return nil, err
+ }
+ defer rows.Close()
+ deps, err := dataDeploymentsFromRow(rows)
+ configs := make([]*pendingConfiguration, len(deps))
+ for i, dep := range deps {
+ configs[i] = &pendingConfiguration{
+ dataDeployment: &dep,
+ }
+ }
+ log.Debugf("Unready Configs %v", configs)
+ return configs, nil
+}
+
+func (dbc *dbManager) getReadyDeployments() ([]Configuration, error) {
// An alternative statement is in get_ready_deployments.sql
// Need testing with large data volume to determine which is better
@@ -186,14 +235,14 @@
}
defer rows.Close()
- deployments, err := dataDeploymentsFromRow(rows)
+ configs, err := dataDeploymentsFromRow(rows)
if err != nil {
return nil, err
}
- log.Debugf("Configurations ready: %v", deployments)
+ log.Debugf("Configurations ready: %v", configs)
- return deployments, nil
+ return configs, nil
}
@@ -244,12 +293,12 @@
return
}
-func dataDeploymentsFromRow(rows *sql.Rows) ([]DataDeployment, error) {
- tmp, err := structFromRows(reflect.TypeOf((*DataDeployment)(nil)).Elem(), rows)
+func dataDeploymentsFromRow(rows *sql.Rows) ([]Configuration, error) {
+ tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows)
if err != nil {
return nil, err
}
- return tmp.([]DataDeployment), nil
+ return tmp.([]Configuration), nil
}
func structFromRows(t reflect.Type, rows *sql.Rows) (interface{}, error) {
diff --git a/data_test.go b/data_test.go
index abd9fc9..2dd6e67 100644
--- a/data_test.go
+++ b/data_test.go
@@ -44,7 +44,7 @@
testCount += 1
testDbMan = &dbManager{
data: services.Data(),
- dbMux: sync.RWMutex{},
+ dbMux: &sync.RWMutex{},
}
testDbMan.setDbVersion("test" + strconv.Itoa(testCount))
initTestDb(testDbMan.getDb())
diff --git a/init.go b/init.go
index 037e5ea..cc03484 100644
--- a/init.go
+++ b/init.go
@@ -123,11 +123,14 @@
},
}
+ // initialize configuration cache
+ configEtag := createConfigurationsEtag()
+
// initialize db manager
dbMan := &dbManager{
data: services.Data(),
- dbMux: sync.RWMutex{},
+ dbMux: &sync.RWMutex{},
}
// initialize api manager
@@ -136,11 +139,9 @@
dbMan: dbMan,
deploymentsEndpoint: deploymentsEndpoint,
blobEndpoint: blobEndpoint,
- eTag: 0,
- deploymentsChanged: make(chan interface{}, 5),
- addSubscriber: make(chan chan deploymentsResult),
- removeSubscriber: make(chan chan deploymentsResult),
+ addSubscriber: make(chan chan interface{}, 10),
apiInitialized: false,
+ configEtag: configEtag,
}
// initialize bundle manager
@@ -158,7 +159,6 @@
bundleMan := &bundleManager{
blobServerUrl: blobServerURL,
dbMan: dbMan,
- apiMan: apiMan,
concurrentDownloads: concurrentDownloads,
markDeploymentFailedAfter: markDeploymentFailedAfter,
bundleRetryDelay: time.Second,
@@ -166,6 +166,7 @@
downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
isClosed: new(int32),
client: httpClient,
+ configEtag:configEtag,
}
bundleMan.initializeBundleDownloading()
@@ -179,6 +180,7 @@
apiMan: apiMan,
bundleMan: bundleMan,
closed: false,
+ configCache: configEtag,
}
eventHandler.initListener(services)
diff --git a/listener.go b/listener.go
index 2769568..ee555ea 100644
--- a/listener.go
+++ b/listener.go
@@ -47,6 +47,7 @@
dbMan dbManagerInterface
apiMan apiManagerInterface
bundleMan bundleManagerInterface
+ configCache *ConfigurationsEtagCache
closed bool
}
@@ -70,7 +71,7 @@
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
h.dbMan.setDbVersion(snapshot.SnapshotInfo)
-
+ h.apiMan.InitDistributeEvents()
h.startupOnExistingDatabase()
h.apiMan.InitAPI()
log.Debug("Snapshot processed")
@@ -82,15 +83,15 @@
go func() {
// create apid_blob_available table
h.dbMan.initDb()
- blobIds, err := h.dbMan.getUnreadyBlobs()
+ configs, err := h.dbMan.getUnreadyConfigs()
if err != nil {
log.Panicf("unable to query database for unready deployments: %v", err)
}
- log.Debugf("Queuing %d blob downloads", len(blobIds))
- for _, id := range blobIds {
- go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id))
+ log.Debugf("Queuing %d unready config downloads", len(configs))
+ for _, c := range configs {
+ go h.bundleMan.queueDownloadRequest(c)
}
}()
}
@@ -98,67 +99,66 @@
func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) {
log.Debugf("Processing changes")
- // changes have been applied to DB
- var insertedDeployments, deletedDeployments []DataDeployment
- var updatedNewBlobs, updatedOldBlobs []string
+ // changes have been applied to DB by apidApigeeSync
+ var insertedConfigs, updatedNewConfigs []*pendingConfiguration
+ var updatedConfigOldIds, deletedConfigIds []string
+ var deletedBlobIds, updatedOldBlobIds []string
for _, change := range changes.Changes {
switch change.Table {
case CONFIG_METADATA_TABLE:
switch change.Operation {
case common.Insert:
dep := dataDeploymentFromRow(change.NewRow)
- insertedDeployments = append(insertedDeployments, dep)
+ insertedConfigs = append(insertedConfigs, &pendingConfiguration{
+ dataDeployment: &dep,
+ })
case common.Delete:
dep := dataDeploymentFromRow(change.OldRow)
- deletedDeployments = append(deletedDeployments, dep)
+ deletedConfigIds = append(deletedConfigIds, dep.ID)
+
+ deletedBlobIds = append(deletedBlobIds, dep.BlobResourceID)
+ deletedBlobIds = append(deletedBlobIds, dep.BlobID)
case common.Update:
depNew := dataDeploymentFromRow(change.NewRow)
depOld := dataDeploymentFromRow(change.OldRow)
- if depOld.BlobID != depNew.BlobID {
- updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID)
- updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID)
- }
+ updatedConfigOldIds = append(updatedConfigOldIds, depOld.ID)
+ updatedNewConfigs = append(updatedNewConfigs, &pendingConfiguration{
+ dataDeployment: &depNew,
+ })
- if depOld.BlobResourceID != depNew.BlobResourceID {
- updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID)
- updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID)
- }
+ updatedOldBlobIds = append(updatedOldBlobIds, depOld.BlobResourceID)
+ updatedOldBlobIds = append(updatedOldBlobIds, depOld.BlobID)
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
}
}
- /*
- for _, d := range deletedDeployments {
- h.apiMan.addChangedDeployment(d.ID)
- }
- */
+
+
+ // update cache with deleted/updated configs
+ log.Debugf("will delete %d configs from cache", len(deletedConfigIds))
+ h.configCache.DeleteBunch(deletedConfigIds)
+ log.Debugf("will delete %d updated old configs from cache", len(updatedConfigOldIds))
+ h.configCache.DeleteBunch(updatedConfigOldIds)
+
+ // TODO clean the old blobs
+ h.bundleMan.deleteBlobs(deletedBlobIds)
+ h.bundleMan.deleteBlobs(updatedOldBlobIds)
// insert
- for i := range insertedDeployments {
- go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
+ for _, c := range insertedConfigs {
+ go h.bundleMan.queueDownloadRequest(c)
}
-
// update
- for i := range updatedNewBlobs {
- go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i]))
+ for _, c := range updatedNewConfigs {
+ go h.bundleMan.queueDownloadRequest(c)
}
- for i := range updatedOldBlobs {
- go h.bundleMan.deleteBundleById(updatedOldBlobs[i])
- }
-
- // delete
- if len(deletedDeployments) > 0 {
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
- //TODO delete bundles for deleted deployments
- h.bundleMan.deleteBundlesFromDeployments(deletedDeployments)
- }
}
-func dataDeploymentFromRow(row common.Row) (d DataDeployment) {
+func dataDeploymentFromRow(row common.Row) (d Configuration) {
row.Get("id", &d.ID)
row.Get("organization_id", &d.OrgID)
diff --git a/listener_test.go b/listener_test.go
index d270be0..e9e46c9 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -38,8 +38,8 @@
dummyDbMan = &dummyDbManager{}
dummyBundleMan = &dummyBundleManager{
requestChan: make(chan *DownloadRequest),
- depChan: make(chan *DataDeployment),
- delChan: make(chan *DataDeployment),
+ depChan: make(chan *Configuration),
+ delChan: make(chan *Configuration),
delBlobChan: make(chan string),
}
testHandler = &apigeeSyncHandler{
@@ -108,7 +108,7 @@
It("Insert event should enqueue download requests for all inserted deployments", func() {
// emit change event
changes := make([]common.Change, 0)
- deployments := make(map[string]DataDeployment)
+ deployments := make(map[string]Configuration)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
@@ -279,8 +279,8 @@
type dummyBundleManager struct {
requestChan chan *DownloadRequest
- depChan chan *DataDeployment
- delChan chan *DataDeployment
+ depChan chan *Configuration
+ delChan chan *Configuration
delBlobChan chan string
}
@@ -288,8 +288,8 @@
}
-func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
- bm.depChan <- dep
+func (bm *dummyBundleManager) queueDownloadRequest(conf *pendingConfiguration) {
+ bm.depChan <- conf.dataDeployment
}
func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
@@ -302,21 +302,24 @@
}
}
-func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) {
+func (bm *dummyBundleManager) deleteBlobFromDeployments(deployments []Configuration) {
for i := range deployments {
bm.delChan <- &deployments[i]
}
}
-func (bm *dummyBundleManager) deleteBundleById(blobId string) {
- bm.delBlobChan <- blobId
+func (bm *dummyBundleManager) deleteBlobs(blobIds []string) {
+ for _, id := range blobIds {
+ bm.delBlobChan <- id
+ }
+
}
func (bm *dummyBundleManager) Close() {
}
-func rowFromDeployment(dep *DataDeployment) common.Row {
+func rowFromDeployment(dep *Configuration) common.Row {
row := common.Row{}
row["id"] = &common.ColumnVal{Value: dep.ID}
row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
diff --git a/longPoll.go b/longPoll.go
new file mode 100644
index 0000000..6d276e9
--- /dev/null
+++ b/longPoll.go
@@ -0,0 +1,58 @@
+package apiGatewayConfDeploy
+
+import "time"
+
+// The channel sent to addSubscriber should be buffered channel
+func distributeEvents(deliverChan <- chan interface{}, addSubscriber chan chan interface{}) {
+ subscribers := make([]chan interface{}, 0)
+ for {
+ select {
+ case element, ok := <-deliverChan:
+ if !ok {
+ return
+ }
+ subs := subscribers
+ subscribers = make([]chan interface{}, 0)
+ for _, subscriber := range subs {
+ go func(sub chan interface{}) {
+ log.Debugf("delivering to: %v", sub)
+ sub <- element
+ }(subscriber)
+ }
+ case sub, ok := <-addSubscriber:
+ if !ok {
+ return
+ }
+ log.Debugf("Add subscriber: %v", sub)
+ subscribers = append(subscribers, sub)
+ }
+ }
+}
+
+
+
+func debounce(in chan interface{}, out chan interface{}, window time.Duration) {
+ send := func(toSend interface{}) {
+ if toSend != nil {
+ out <- toSend
+ }
+ }
+ var toSend interface{} = nil
+ for {
+ select {
+ case incoming, ok := <-in:
+ if ok {
+ log.Debugf("debouncing %v", incoming)
+ toSend = incoming
+ } else {
+ send(toSend)
+ log.Debugf("closing debouncer")
+ close(out)
+ return
+ }
+ case <-time.After(window):
+ send(toSend)
+ toSend = nil
+ }
+ }
+}
\ No newline at end of file