[ISSUE-66918282] long poll for /configurations
diff --git a/api.go b/api.go index f676aaa..10a11d5 100644 --- a/api.go +++ b/api.go
@@ -16,14 +16,12 @@ import ( "bytes" "encoding/json" - "fmt" "github.com/gorilla/mux" "io" "io/ioutil" "net/http" "net/url" "strconv" - "sync/atomic" "time" ) @@ -64,7 +62,7 @@ ) type deploymentsResult struct { - deployments []DataDeployment + deployments []Configuration err error eTag string } @@ -96,36 +94,33 @@ //TODO add support for block and subscriber type apiManagerInterface interface { + InitDistributeEvents() InitAPI() - //addChangedDeployment(string) - //distributeEvents() } type apiManager struct { dbMan dbManagerInterface deploymentsEndpoint string blobEndpoint string - eTag int64 - deploymentsChanged chan interface{} - addSubscriber chan chan deploymentsResult - removeSubscriber chan chan deploymentsResult + addSubscriber chan chan interface{} apiInitialized bool + configEtag *ConfigurationsEtagCache +} + +func (a *apiManager) InitDistributeEvents(){ + go distributeEvents(a.configEtag.getChangeChannel(), a.addSubscriber) } func (a *apiManager) InitAPI() { if a.apiInitialized { return } - services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetConfigurations).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") a.apiInitialized = true log.Debug("API endpoints initialized") } -func (a *apiManager) addChangedDeployment(id string) { - a.deploymentsChanged <- id -} - func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) { w.WriteHeader(status) e := errorResponse{ @@ -145,70 +140,11 @@ a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err) } -func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) { - send := func(toSend []interface{}) { - if toSend != nil { - log.Debugf("debouncer sending: %v", toSend) - out <- toSend - } - } - var toSend []interface{} - for { - select { - case incoming, ok := <-in: - if ok { - log.Debugf("debouncing %v", incoming) - toSend = append(toSend, incoming) - } else { - send(toSend) - log.Debugf("closing debouncer") - close(out) - return - } - case <-time.After(window): - send(toSend) - toSend = nil - } - } -} -//TODO get notified when deployments ready -/* -func (a *apiManager) distributeEvents() { - subscribers := make(map[chan deploymentsResult]bool) - deliverDeployments := make(chan []interface{}, 1) - go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration) - for { - select { - case _, ok := <-deliverDeployments: - if !ok { - return // todo: using this? - } - subs := subscribers - subscribers = make(map[chan deploymentsResult]bool) - go func() { - eTag := a.incrementETag() - deployments, err := a.dbMan.getUnreadyDeployments() - log.Debugf("delivering deployments to %d subscribers", len(subs)) - for subscriber := range subs { - log.Debugf("delivering to: %v", subscriber) - subscriber <- deploymentsResult{deployments, err, eTag} - } - }() - case subscriber := <-a.addSubscriber: - log.Debugf("Add subscriber: %v", subscriber) - subscribers[subscriber] = true - case subscriber := <-a.removeSubscriber: - log.Debugf("Remove subscriber: %v", subscriber) - delete(subscribers, subscriber) - } - } -} -*/ -// TODO use If-None-Match and ETag + func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -231,7 +167,7 @@ } -func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { +func (a *apiManager) apiGetConfigurations(w http.ResponseWriter, r *http.Request) { // If returning without a bundle (immediately or after timeout), status = 404 // If returning If-None-Match value is equal to current deployment, status = 304 @@ -244,7 +180,7 @@ if b != "" { var err error timeout, err = strconv.Atoi(b) - if err != nil { + if err != nil || timeout<0{ a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") return } @@ -254,62 +190,45 @@ // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block' // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the // most recent bundle list. - ifNoneMatch := r.Header.Get("If-None-Match") - log.Debugf("if-none-match: %s", ifNoneMatch) + requestETag := r.Header.Get("Etag") + log.Debugf("Etag: %s", requestETag) // send unmodified if matches prior eTag and no timeout eTag := a.getETag() - if eTag == ifNoneMatch && timeout == 0 { - w.WriteHeader(http.StatusNotModified) - return - } - - // send results if different eTag - if eTag != ifNoneMatch { + if requestETag=="" || eTag != requestETag { // send results if different eTag a.sendReadyDeployments(w) return } - // otherwise, subscribe to any new deployment changes - var newDeploymentsChannel chan deploymentsResult - if timeout > 0 && ifNoneMatch != "" { - //TODO handle block - //newDeploymentsChannel = make(chan deploymentsResult, 1) - //a.addSubscriber <- newDeploymentsChannel + if timeout == 0 { // non-blocking + w.WriteHeader(http.StatusNotModified) + return } + // long poll + + // subscribe to any new deployment changes + ConfigChangeChan := make(chan interface{}, 1) + a.addSubscriber <- ConfigChangeChan + log.Debug("Blocking request... Waiting for new Deployments.") select { - case result := <-newDeploymentsChannel: - if result.err != nil { - a.writeInternalError(w, "Database error") - } else { - a.sendDeployments(w, result.deployments, result.eTag) - } - + case <-ConfigChangeChan: + // send configs and etag + a.sendReadyDeployments(w) case <-time.After(time.Duration(timeout) * time.Second): - a.removeSubscriber <- newDeploymentsChannel - log.Debug("Blocking deployment request timed out.") - if ifNoneMatch != "" { - w.WriteHeader(http.StatusNotModified) - } else { - a.sendReadyDeployments(w) - } + log.Debug("Blocking configuration request timed out.") + w.WriteHeader(http.StatusNotModified) } } func (a *apiManager) sendReadyDeployments(w http.ResponseWriter) { - eTag := a.getETag() - deployments, err := a.dbMan.getReadyDeployments() - if err != nil { - a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error())) - return - } - a.sendDeployments(w, deployments, eTag) + eTagConfig := a.configEtag.GetConfigsAndETag() + a.sendDeployments(w, eTagConfig.Configs, eTagConfig.ETag) } -func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string) { +func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []Configuration, eTag string) { apiDeps := ApiDeploymentResponse{} apiDepDetails := make([]ApiDeploymentDetails, 0) @@ -346,15 +265,12 @@ w.Write(b) } -// call whenever the list of deployments changes -func (a *apiManager) incrementETag() string { - e := atomic.AddInt64(&a.eTag, 1) - return strconv.FormatInt(e, 10) -} func (a *apiManager) getETag() string { - e := atomic.LoadInt64(&a.eTag) - return strconv.FormatInt(e, 10) + if a.configEtag==nil { + return "" + } + return a.configEtag.GetETag() } // escape the blobId into url
diff --git a/api_test.go b/api_test.go index ccd3b0c..a2bc4a4 100644 --- a/api_test.go +++ b/api_test.go
@@ -46,10 +46,7 @@ dbMan: dummyDbMan, deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount), blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", - eTag: int64(testCount * 10), - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), + addSubscriber: make(chan chan interface{}), } testApiMan.InitAPI() time.Sleep(100 * time.Millisecond) @@ -197,7 +194,7 @@ dep := makeTestDeployment() dep.Created = t dep.Updated = t - dummyDbMan.readyDeployments = []DataDeployment{*dep} + dummyDbMan.readyDeployments = []Configuration{*dep} detail := makeExpectedDetail(dep, uri.String()) detail.Created = isoTime[i] detail.Updated = isoTime[i] @@ -218,29 +215,6 @@ } }) - It("should debounce requests", func(done Done) { - var in = make(chan interface{}) - var out = make(chan []interface{}) - - go testApiMan.debounce(in, out, 3*time.Millisecond) - - go func() { - defer GinkgoRecover() - - received, ok := <-out - Expect(ok).To(BeTrue()) - Expect(len(received)).To(Equal(2)) - - close(in) - received, ok = <-out - Expect(ok).To(BeFalse()) - - close(done) - }() - - in <- "x" - in <- "y" - }) }) @@ -280,7 +254,7 @@ mathrand.Seed(time.Now().UnixNano()) count := mathrand.Intn(5) + 1 - deployments := make([]DataDeployment, count) + deployments := make([]Configuration, count) details := make([]ApiDeploymentDetails, count) for i := 0; i < count; i++ { @@ -296,8 +270,8 @@ return details } -func makeTestDeployment() *DataDeployment { - dep := &DataDeployment{ +func makeTestDeployment() *Configuration { + dep := &Configuration{ ID: GenerateUUID(), OrgID: GenerateUUID(), EnvID: GenerateUUID(), @@ -315,7 +289,7 @@ return dep } -func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails { +func makeExpectedDetail(dep *Configuration, self string) *ApiDeploymentDetails { detail := &ApiDeploymentDetails{ Self: self + "/" + dep.ID, Name: dep.Name, @@ -334,7 +308,8 @@ type dummyDbManager struct { unreadyBlobIds []string - readyDeployments []DataDeployment + unreadyConfigs []*pendingConfiguration + readyDeployments []Configuration localFSLocation string fileResponse chan string version string @@ -344,6 +319,10 @@ d.version = version } +func (d *dummyDbManager) getUnreadyConfigs() ([]*pendingConfiguration, error) { + return d.unreadyConfigs, nil +} + func (d *dummyDbManager) initDb() error { return nil } @@ -352,7 +331,7 @@ return d.unreadyBlobIds, nil } -func (d *dummyDbManager) getReadyDeployments() ([]DataDeployment, error) { +func (d *dummyDbManager) getReadyDeployments() ([]Configuration, error) { return d.readyDeployments, nil }
diff --git a/bundle.go b/bundle.go index b4729cf..88c61e0 100644 --- a/bundle.go +++ b/bundle.go
@@ -34,18 +34,16 @@ type bundleManagerInterface interface { initializeBundleDownloading() - queueDownloadRequest(*DataDeployment) + queueDownloadRequest(*pendingConfiguration) enqueueRequest(*DownloadRequest) - makeDownloadRequest(string) *DownloadRequest - deleteBundlesFromDeployments([]DataDeployment) - deleteBundleById(string) + makeDownloadRequest(id string, counter *int32) *DownloadRequest + deleteBlobs([]string) Close() } type bundleManager struct { blobServerUrl string dbMan dbManagerInterface - apiMan apiManagerInterface concurrentDownloads int markDeploymentFailedAfter time.Duration bundleRetryDelay time.Duration @@ -54,6 +52,7 @@ isClosed *int32 workers []*BundleDownloader client *http.Client + configEtag *ConfigurationsEtagCache } type blobServerResponse struct { @@ -64,6 +63,11 @@ SignedUrlExpiryTimestamp string `json:"signedurlexpirytimestamp"` } +type pendingConfiguration struct { + dataDeployment *Configuration + counter int32 +} + func (bm *bundleManager) initializeBundleDownloading() { atomic.StoreInt32(bm.isClosed, 0) bm.workers = make([]*BundleDownloader, bm.concurrentDownloads) @@ -82,17 +86,30 @@ // download bundle blob and resource blob // TODO do not download duplicate blobs -func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { - blobReq := bm.makeDownloadRequest(dep.BlobID) - resourceReq := bm.makeDownloadRequest(dep.BlobResourceID) - - go func() { - bm.enqueueRequest(blobReq) - bm.enqueueRequest(resourceReq) - }() +func (bm *bundleManager) queueDownloadRequest(conf *pendingConfiguration) { + log.Debugf("enque pendingConfiguration: %s", conf.dataDeployment.ID) + conf.counter = 0 + if conf.dataDeployment.BlobID != "" { + conf.counter++ + } + if conf.dataDeployment.BlobResourceID != "" { + conf.counter++ + } + if conf.dataDeployment.BlobID != "" { + blobReq := bm.makeDownloadRequest(conf.dataDeployment.BlobID, &conf.counter) + go bm.enqueueRequest(blobReq) + } + if conf.dataDeployment.BlobResourceID != "" { + resourceReq := bm.makeDownloadRequest(conf.dataDeployment.BlobResourceID, &conf.counter) + go bm.enqueueRequest(resourceReq) + } } -func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest { + +func (bm *bundleManager) makeDownloadRequest(id string, counter *int32) *DownloadRequest { + if id=="" { + return nil + } markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute @@ -104,6 +121,7 @@ backoffFunc: createBackoff(retryIn, maxBackOff), markFailedAt: markFailedAt, client: bm.client, + counter: counter, } } @@ -112,13 +130,6 @@ if atomic.LoadInt32(bm.isClosed) == 1 { return } - /* - defer func() { - if r := recover(); r != nil { - log.Warn("trying to enque requests to closed bundleManager") - } - }() - */ bm.downloadQueue <- r } @@ -127,31 +138,13 @@ close(bm.downloadQueue) } -func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) { - for _, dep := range deletedDeployments { - go bm.deleteBundleById(dep.BlobID) - go bm.deleteBundleById(dep.BlobResourceID) - } +func (bm *bundleManager) deleteBlobs (ids []string) { + // TODO Delete from the Database table apid_blob_available, reference counting - /* - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - go func() { - // give clients a minute to avoid conflicts - time.Sleep(bm.bundleCleanupDelay) - for _, dep := range deletedDeployments { - bundleFile := getBlobFilePath(dep.BlobID) - log.Debugf("removing old bundle: %v", bundleFile) - // TODO Remove from the Database table apid_blob_available - safeDelete(bundleFile) - } - }() - */ + // TODO Delete from local file system } -// TODO add delete support -func (bm *bundleManager) deleteBundleById(blobId string) { -} type DownloadRequest struct { bm *bundleManager @@ -160,6 +153,8 @@ markFailedAt time.Time blobServerURL string client *http.Client + counter *int32 + dep *Configuration } func (r *DownloadRequest) downloadBundle() error { @@ -188,7 +183,9 @@ return err } - log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile) + // succeeded + + log.Debugf("blob downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile) err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile) if err != nil { @@ -199,10 +196,12 @@ return err } - log.Debugf("bundle downloaded: blobId=%s filename=%s", r.blobId, downloadedFile) + log.Debugf("blob inserted: blobId=%s filename=%s", r.blobId, downloadedFile) - // TODO send changed deployments to subscribers (API call with "block") - //r.bm.apiMan.addChangedDeployment(dep.ID) + // if all required blobs have been downloaded + if atomic.AddInt32(r.counter, -1) == 0 && r.bm.configEtag!=nil { + go r.bm.configEtag.Insert(r.dep) + } return nil }
diff --git a/bundle_test.go b/bundle_test.go index 7946f96..5a373ac 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -69,7 +69,6 @@ testBundleMan = &bundleManager{ blobServerUrl: bundleTestUrl, dbMan: dummyDbMan, - apiMan: dummyApiMan, concurrentDownloads: concurrentDownloads, markDeploymentFailedAfter: 5 * time.Second, bundleRetryDelay: time.Second,
diff --git a/configEtag.go b/configEtag.go new file mode 100644 index 0000000..54860f2 --- /dev/null +++ b/configEtag.go
@@ -0,0 +1,141 @@ +package apiGatewayConfDeploy + +import ( + "crypto/md5" + "sort" + "strings" + "sync" +) + + +type ETagAndConfig struct { + ETag string + Configs []Configuration +} + +func createConfigurationsEtag() *ConfigurationsEtagCache { + return &ConfigurationsEtagCache{ + List: &SortedList{}, + mutex: &sync.RWMutex{}, + configChange: make(chan interface{}, 10), + } +} + + + +type ConfigurationsEtagCache struct { + List *SortedList + etag string + mutex *sync.RWMutex + configChange chan interface{} +} + + + +func (c *ConfigurationsEtagCache) Construct(deps []Configuration) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.List.Construct(deps) + c.etag = string(c.List.ComputeHash()) + go c.notifyChange() +} + +func (c *ConfigurationsEtagCache) Insert(dep *Configuration) { + c.mutex.Lock() + defer c.mutex.Unlock() + c.List.Insert(dep) + c.etag = string(c.List.ComputeHash()) + go c.notifyChange() +} + +// delete the ids from cache, and notify change +func (c *ConfigurationsEtagCache) DeleteBunch(ids []string) { + c.mutex.Lock() + defer c.mutex.Unlock() + for _, id := range ids { + c.List.Delete(id) + } + c.etag = string(c.List.ComputeHash()) + go c.notifyChange() +} + +func (c *ConfigurationsEtagCache) GetETag() string { + c.mutex.RLock() + defer c.mutex.RUnlock() + return c.etag +} + +func (c *ConfigurationsEtagCache) GetConfigsAndETag() *ETagAndConfig { + c.mutex.RLock() + defer c.mutex.RUnlock() + return &ETagAndConfig{ + ETag: c.etag, + Configs: c.List.GetConfigs(), + } +} + +func (c *ConfigurationsEtagCache) notifyChange() { + c.configChange <- true +} + +func (c *ConfigurationsEtagCache) getChangeChannel() <- chan interface{} { + return c.configChange +} + +type SortedList struct { + list []*Configuration +} + +func (s *SortedList) less(i int, j int) bool { + return strings.Compare(s.list[i].ID, s.list[j].ID) < 0 +} + +func (s *SortedList) Insert(dep *Configuration) { + i := s.indexOf(dep.ID) + s.list = append(s.list, nil) + copy(s.list[i+1:], s.list[i:]) + s.list[i] = dep +} + +func (s *SortedList) ComputeHash() [16]byte { + ids := make([]string, len(s.list)) + for i, dep := range s.list { + ids[i] = dep.ID + } + return md5.Sum([]byte(strings.Join(ids, ""))) +} + +func (s *SortedList) Delete(id string) { + i := s.indexOf(id) + if s.list[i].ID == id { + s.list = append(s.list[:i], s.list[i+1:]...) + } +} + +func (s *SortedList) indexOf(id string) int { + var i int + for i = 0; strings.Compare(s.list[i].ID, id) == -1; i++ { + } + return i +} + +func (s *SortedList) Construct(l []Configuration) { + + s.list = make([]*Configuration, len(l)) + for i, dep := range l { + s.list[i] = &dep + } + sort.Slice(s.list, s.less) +} + +func (s *SortedList) Len() int { + return len(s.list) +} + +func (s *SortedList) GetConfigs() []Configuration { + deps := make([]Configuration, len(s.list)) + for i, dep := range s.list { + deps[i] = *dep + } + return deps +} \ No newline at end of file
diff --git a/data.go b/data.go index 564ce6d..abab743 100644 --- a/data.go +++ b/data.go
@@ -25,7 +25,7 @@ gwBlobId int64 ) -type DataDeployment struct { +type Configuration struct { ID string OrgID string EnvID string @@ -49,15 +49,16 @@ setDbVersion(string) initDb() error getUnreadyBlobs() ([]string, error) - getReadyDeployments() ([]DataDeployment, error) + getUnreadyConfigs() ([]*pendingConfiguration, error) + getReadyDeployments() ([]Configuration, error) updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) } type dbManager struct { - data apid.DataService - db apid.DB - dbMux sync.RWMutex + data apid.DataService + db apid.DB + dbMux *sync.RWMutex } func (dbc *dbManager) setDbVersion(version string) { @@ -135,7 +136,55 @@ return } -func (dbc *dbManager) getReadyDeployments() ([]DataDeployment, error) { +func (dbc *dbManager) getUnreadyConfigs() ([]*pendingConfiguration, error) { + + rows, err := dbc.getDb().Query(` + SELECT a.id, + a.organization_id, + a.environment_id, + a.bean_blob_id, + a.resource_blob_id, + a.type, + a.name, + a.revision, + a.path, + a.created_at, + a.created_by, + a.updated_at, + a.updated_by + FROM metadata_runtime_entity_metadata as a + WHERE a.id IN + ( + SELECT a.id as id + FROM metadata_runtime_entity_metadata as a + WHERE a.bean_blob_id NOT IN + (SELECT b.id FROM apid_blob_available as b) + UNION + SELECT a.id as id + FROM metadata_runtime_entity_metadata as a + WHERE a.resource_blob_id NOT IN + (SELECT b.id FROM apid_blob_available as b) + ) + WHERE id IS NOT NULL AND id != '' + ; + `) + if err != nil { + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return nil, err + } + defer rows.Close() + deps, err := dataDeploymentsFromRow(rows) + configs := make([]*pendingConfiguration, len(deps)) + for i, dep := range deps { + configs[i] = &pendingConfiguration{ + dataDeployment: &dep, + } + } + log.Debugf("Unready Configs %v", configs) + return configs, nil +} + +func (dbc *dbManager) getReadyDeployments() ([]Configuration, error) { // An alternative statement is in get_ready_deployments.sql // Need testing with large data volume to determine which is better @@ -186,14 +235,14 @@ } defer rows.Close() - deployments, err := dataDeploymentsFromRow(rows) + configs, err := dataDeploymentsFromRow(rows) if err != nil { return nil, err } - log.Debugf("Configurations ready: %v", deployments) + log.Debugf("Configurations ready: %v", configs) - return deployments, nil + return configs, nil } @@ -244,12 +293,12 @@ return } -func dataDeploymentsFromRow(rows *sql.Rows) ([]DataDeployment, error) { - tmp, err := structFromRows(reflect.TypeOf((*DataDeployment)(nil)).Elem(), rows) +func dataDeploymentsFromRow(rows *sql.Rows) ([]Configuration, error) { + tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows) if err != nil { return nil, err } - return tmp.([]DataDeployment), nil + return tmp.([]Configuration), nil } func structFromRows(t reflect.Type, rows *sql.Rows) (interface{}, error) {
diff --git a/data_test.go b/data_test.go index abd9fc9..2dd6e67 100644 --- a/data_test.go +++ b/data_test.go
@@ -44,7 +44,7 @@ testCount += 1 testDbMan = &dbManager{ data: services.Data(), - dbMux: sync.RWMutex{}, + dbMux: &sync.RWMutex{}, } testDbMan.setDbVersion("test" + strconv.Itoa(testCount)) initTestDb(testDbMan.getDb())
diff --git a/init.go b/init.go index 037e5ea..cc03484 100644 --- a/init.go +++ b/init.go
@@ -123,11 +123,14 @@ }, } + // initialize configuration cache + configEtag := createConfigurationsEtag() + // initialize db manager dbMan := &dbManager{ data: services.Data(), - dbMux: sync.RWMutex{}, + dbMux: &sync.RWMutex{}, } // initialize api manager @@ -136,11 +139,9 @@ dbMan: dbMan, deploymentsEndpoint: deploymentsEndpoint, blobEndpoint: blobEndpoint, - eTag: 0, - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), + addSubscriber: make(chan chan interface{}, 10), apiInitialized: false, + configEtag: configEtag, } // initialize bundle manager @@ -158,7 +159,6 @@ bundleMan := &bundleManager{ blobServerUrl: blobServerURL, dbMan: dbMan, - apiMan: apiMan, concurrentDownloads: concurrentDownloads, markDeploymentFailedAfter: markDeploymentFailedAfter, bundleRetryDelay: time.Second, @@ -166,6 +166,7 @@ downloadQueue: make(chan *DownloadRequest, downloadQueueSize), isClosed: new(int32), client: httpClient, + configEtag:configEtag, } bundleMan.initializeBundleDownloading() @@ -179,6 +180,7 @@ apiMan: apiMan, bundleMan: bundleMan, closed: false, + configCache: configEtag, } eventHandler.initListener(services)
diff --git a/listener.go b/listener.go index 2769568..ee555ea 100644 --- a/listener.go +++ b/listener.go
@@ -47,6 +47,7 @@ dbMan dbManagerInterface apiMan apiManagerInterface bundleMan bundleManagerInterface + configCache *ConfigurationsEtagCache closed bool } @@ -70,7 +71,7 @@ log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) h.dbMan.setDbVersion(snapshot.SnapshotInfo) - + h.apiMan.InitDistributeEvents() h.startupOnExistingDatabase() h.apiMan.InitAPI() log.Debug("Snapshot processed") @@ -82,15 +83,15 @@ go func() { // create apid_blob_available table h.dbMan.initDb() - blobIds, err := h.dbMan.getUnreadyBlobs() + configs, err := h.dbMan.getUnreadyConfigs() if err != nil { log.Panicf("unable to query database for unready deployments: %v", err) } - log.Debugf("Queuing %d blob downloads", len(blobIds)) - for _, id := range blobIds { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id)) + log.Debugf("Queuing %d unready config downloads", len(configs)) + for _, c := range configs { + go h.bundleMan.queueDownloadRequest(c) } }() } @@ -98,67 +99,66 @@ func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) { log.Debugf("Processing changes") - // changes have been applied to DB - var insertedDeployments, deletedDeployments []DataDeployment - var updatedNewBlobs, updatedOldBlobs []string + // changes have been applied to DB by apidApigeeSync + var insertedConfigs, updatedNewConfigs []*pendingConfiguration + var updatedConfigOldIds, deletedConfigIds []string + var deletedBlobIds, updatedOldBlobIds []string for _, change := range changes.Changes { switch change.Table { case CONFIG_METADATA_TABLE: switch change.Operation { case common.Insert: dep := dataDeploymentFromRow(change.NewRow) - insertedDeployments = append(insertedDeployments, dep) + insertedConfigs = append(insertedConfigs, &pendingConfiguration{ + dataDeployment: &dep, + }) case common.Delete: dep := dataDeploymentFromRow(change.OldRow) - deletedDeployments = append(deletedDeployments, dep) + deletedConfigIds = append(deletedConfigIds, dep.ID) + + deletedBlobIds = append(deletedBlobIds, dep.BlobResourceID) + deletedBlobIds = append(deletedBlobIds, dep.BlobID) case common.Update: depNew := dataDeploymentFromRow(change.NewRow) depOld := dataDeploymentFromRow(change.OldRow) - if depOld.BlobID != depNew.BlobID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID) - } + updatedConfigOldIds = append(updatedConfigOldIds, depOld.ID) + updatedNewConfigs = append(updatedNewConfigs, &pendingConfiguration{ + dataDeployment: &depNew, + }) - if depOld.BlobResourceID != depNew.BlobResourceID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID) - } + updatedOldBlobIds = append(updatedOldBlobIds, depOld.BlobResourceID) + updatedOldBlobIds = append(updatedOldBlobIds, depOld.BlobID) default: log.Errorf("unexpected operation: %s", change.Operation) } } } - /* - for _, d := range deletedDeployments { - h.apiMan.addChangedDeployment(d.ID) - } - */ + + + // update cache with deleted/updated configs + log.Debugf("will delete %d configs from cache", len(deletedConfigIds)) + h.configCache.DeleteBunch(deletedConfigIds) + log.Debugf("will delete %d updated old configs from cache", len(updatedConfigOldIds)) + h.configCache.DeleteBunch(updatedConfigOldIds) + + // TODO clean the old blobs + h.bundleMan.deleteBlobs(deletedBlobIds) + h.bundleMan.deleteBlobs(updatedOldBlobIds) // insert - for i := range insertedDeployments { - go h.bundleMan.queueDownloadRequest(&insertedDeployments[i]) + for _, c := range insertedConfigs { + go h.bundleMan.queueDownloadRequest(c) } - // update - for i := range updatedNewBlobs { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i])) + for _, c := range updatedNewConfigs { + go h.bundleMan.queueDownloadRequest(c) } - for i := range updatedOldBlobs { - go h.bundleMan.deleteBundleById(updatedOldBlobs[i]) - } - - // delete - if len(deletedDeployments) > 0 { - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - //TODO delete bundles for deleted deployments - h.bundleMan.deleteBundlesFromDeployments(deletedDeployments) - } } -func dataDeploymentFromRow(row common.Row) (d DataDeployment) { +func dataDeploymentFromRow(row common.Row) (d Configuration) { row.Get("id", &d.ID) row.Get("organization_id", &d.OrgID)
diff --git a/listener_test.go b/listener_test.go index d270be0..e9e46c9 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -38,8 +38,8 @@ dummyDbMan = &dummyDbManager{} dummyBundleMan = &dummyBundleManager{ requestChan: make(chan *DownloadRequest), - depChan: make(chan *DataDeployment), - delChan: make(chan *DataDeployment), + depChan: make(chan *Configuration), + delChan: make(chan *Configuration), delBlobChan: make(chan string), } testHandler = &apigeeSyncHandler{ @@ -108,7 +108,7 @@ It("Insert event should enqueue download requests for all inserted deployments", func() { // emit change event changes := make([]common.Change, 0) - deployments := make(map[string]DataDeployment) + deployments := make(map[string]Configuration) for i := 0; i < 1+rand.Intn(10); i++ { dep := makeTestDeployment() change := common.Change{ @@ -279,8 +279,8 @@ type dummyBundleManager struct { requestChan chan *DownloadRequest - depChan chan *DataDeployment - delChan chan *DataDeployment + depChan chan *Configuration + delChan chan *Configuration delBlobChan chan string } @@ -288,8 +288,8 @@ } -func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) { - bm.depChan <- dep +func (bm *dummyBundleManager) queueDownloadRequest(conf *pendingConfiguration) { + bm.depChan <- conf.dataDeployment } func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) { @@ -302,21 +302,24 @@ } } -func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) { +func (bm *dummyBundleManager) deleteBlobFromDeployments(deployments []Configuration) { for i := range deployments { bm.delChan <- &deployments[i] } } -func (bm *dummyBundleManager) deleteBundleById(blobId string) { - bm.delBlobChan <- blobId +func (bm *dummyBundleManager) deleteBlobs(blobIds []string) { + for _, id := range blobIds { + bm.delBlobChan <- id + } + } func (bm *dummyBundleManager) Close() { } -func rowFromDeployment(dep *DataDeployment) common.Row { +func rowFromDeployment(dep *Configuration) common.Row { row := common.Row{} row["id"] = &common.ColumnVal{Value: dep.ID} row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
diff --git a/longPoll.go b/longPoll.go new file mode 100644 index 0000000..6d276e9 --- /dev/null +++ b/longPoll.go
@@ -0,0 +1,58 @@ +package apiGatewayConfDeploy + +import "time" + +// The channel sent to addSubscriber should be buffered channel +func distributeEvents(deliverChan <- chan interface{}, addSubscriber chan chan interface{}) { + subscribers := make([]chan interface{}, 0) + for { + select { + case element, ok := <-deliverChan: + if !ok { + return + } + subs := subscribers + subscribers = make([]chan interface{}, 0) + for _, subscriber := range subs { + go func(sub chan interface{}) { + log.Debugf("delivering to: %v", sub) + sub <- element + }(subscriber) + } + case sub, ok := <-addSubscriber: + if !ok { + return + } + log.Debugf("Add subscriber: %v", sub) + subscribers = append(subscribers, sub) + } + } +} + + + +func debounce(in chan interface{}, out chan interface{}, window time.Duration) { + send := func(toSend interface{}) { + if toSend != nil { + out <- toSend + } + } + var toSend interface{} = nil + for { + select { + case incoming, ok := <-in: + if ok { + log.Debugf("debouncing %v", incoming) + toSend = incoming + } else { + send(toSend) + log.Debugf("closing debouncer") + close(out) + return + } + case <-time.After(window): + send(toSend) + toSend = nil + } + } +} \ No newline at end of file