[ISSUE-66918282] support long poll for "/configurations"
diff --git a/api.go b/api.go index 4ffc12e..97b9a85 100644 --- a/api.go +++ b/api.go
@@ -17,14 +17,15 @@ "bytes" "database/sql" "encoding/json" + "errors" "fmt" + "github.com/apigee-labs/transicator/common" "github.com/gorilla/mux" "io" "io/ioutil" "net/http" "net/url" "strconv" - "sync/atomic" "time" ) @@ -64,11 +65,14 @@ ) const ( - headerSteam = "application/octet-stream" + headerSteam = "application/octet-stream" + apidConfigIndexHeader = "x-apid-config-index" ) +var ErrNoLSN = errors.New("No last sequence in DB") + type deploymentsResult struct { - deployments []DataDeployment + deployments []Configuration err error eTag string } @@ -101,8 +105,7 @@ //TODO add support for block and subscriber type apiManagerInterface interface { InitAPI() - //addChangedDeployment(string) - //distributeEvents() + notifyNewChangeList(newLSN string) } type apiManager struct { @@ -110,10 +113,8 @@ deploymentsEndpoint string blobEndpoint string deploymentIdEndpoint string - eTag int64 - deploymentsChanged chan interface{} - addSubscriber chan chan deploymentsResult - removeSubscriber chan chan deploymentsResult + addSubscriber chan chan interface{} + newChangeListChan chan interface{} apiInitialized bool } @@ -124,12 +125,17 @@ services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") services.API().HandleFunc(a.deploymentIdEndpoint, a.apiHandleConfigId).Methods("GET") + a.initDistributeEvents() a.apiInitialized = true log.Debug("API endpoints initialized") } -func (a *apiManager) addChangedDeployment(id string) { - a.deploymentsChanged <- id +func (a *apiManager) initDistributeEvents() { + go distributeEvents(a.newChangeListChan, a.addSubscriber) +} + +func (a *apiManager) notifyNewChangeList(newLSN string) { + a.newChangeListChan <- newLSN } func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -151,70 +157,6 @@ a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err) } -func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) { - send := func(toSend []interface{}) { - if toSend != nil { - log.Debugf("debouncer sending: %v", toSend) - out <- toSend - } - } - var toSend []interface{} - for { - select { - case incoming, ok := <-in: - if ok { - log.Debugf("debouncing %v", incoming) - toSend = append(toSend, incoming) - } else { - send(toSend) - log.Debugf("closing debouncer") - close(out) - return - } - case <-time.After(window): - send(toSend) - toSend = nil - } - } -} - -//TODO get notified when deployments ready -/* -func (a *apiManager) distributeEvents() { - subscribers := make(map[chan deploymentsResult]bool) - deliverDeployments := make(chan []interface{}, 1) - - go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration) - - for { - select { - case _, ok := <-deliverDeployments: - if !ok { - return // todo: using this? - } - subs := subscribers - subscribers = make(map[chan deploymentsResult]bool) - go func() { - eTag := a.incrementETag() - deployments, err := a.dbMan.getUnreadyDeployments() - log.Debugf("delivering deployments to %d subscribers", len(subs)) - for subscriber := range subs { - log.Debugf("delivering to: %v", subscriber) - subscriber <- deploymentsResult{deployments, err, eTag} - } - }() - case subscriber := <-a.addSubscriber: - log.Debugf("Add subscriber: %v", subscriber) - subscribers[subscriber] = true - case subscriber := <-a.removeSubscriber: - log.Debugf("Remove subscriber: %v", subscriber) - delete(subscribers, subscriber) - } - } -} -*/ - -// TODO use If-None-Match and ETag func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -282,78 +224,90 @@ // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then // block for up to the specified number of seconds until a new deployment becomes available. - b := r.URL.Query().Get("block") + blockSec := r.URL.Query().Get("block") typeFilter := r.URL.Query().Get("type") + headerLSN := r.URL.Query().Get("apid-config-index") var timeout int - if b != "" { - var err error - timeout, err = strconv.Atoi(b) - if err != nil { + var err error + if blockSec != "" { + timeout, err = strconv.Atoi(blockSec) + if err != nil || timeout < 0 { a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") return } } - log.Debugf("api timeout: %d", timeout) + log.Debugf("/configurations long-poll timeout: %d", timeout) - // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block' - // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the - // most recent bundle list. - ifNoneMatch := r.Header.Get("If-None-Match") - log.Debugf("if-none-match: %s", ifNoneMatch) + log.Debugf("Long-Poll-Index: %s", headerLSN) - // send unmodified if matches prior eTag and no timeout - eTag := a.getETag() - if eTag == ifNoneMatch && timeout == 0 { - w.WriteHeader(http.StatusNotModified) + // if filter by "type" + if typeFilter != "" { + a.sendReadyDeployments(typeFilter, w, "") return } - // send results if different eTag - if eTag != ifNoneMatch { - a.sendReadyDeployments(typeFilter, w) - return + // if no Long Poll Index + if headerLSN == "" { + headerLSN = "0.0.0" } - // otherwise, subscribe to any new deployment changes - var newDeploymentsChannel chan deploymentsResult - if timeout > 0 && ifNoneMatch != "" { - //TODO handle block - //newDeploymentsChannel = make(chan deploymentsResult, 1) - //a.addSubscriber <- newDeploymentsChannel - } - - log.Debug("Blocking request... Waiting for new Deployments.") - - select { - case result := <-newDeploymentsChannel: - if result.err != nil { - a.writeInternalError(w, "Database error") - } else { - a.sendDeployments(w, result.deployments, result.eTag, typeFilter) + // if no filter, check for long polling + cmpRes, apidLSN, err := a.compareLSN(headerLSN) + switch { + case err != nil: + if err == ErrNoLSN { // apid hasn't got any LSN from Change Server + // This may happen during apid bootstrap + a.waitForNewCL(w, time.Duration(timeout)) } - - case <-time.After(time.Duration(timeout) * time.Second): - a.removeSubscriber <- newDeploymentsChannel - log.Debug("Blocking deployment request timed out.") - if ifNoneMatch != "" { + log.Errorf("Error in compareLSN: %v", err) + a.writeInternalError(w, err.Error()) + return + case cmpRes <= 0: //APID_LSN <= Header_LSN + if timeout == 0 { // no long polling w.WriteHeader(http.StatusNotModified) - } else { - a.sendReadyDeployments(typeFilter, w) + } else { // long polling + a.waitForNewCL(w, time.Duration(timeout)) } + return + case cmpRes > 0: //APID_LSN > Header_LSN + a.sendReadyDeployments("", w, apidLSN) + return } } -func (a *apiManager) sendReadyDeployments(typeFilter string, w http.ResponseWriter) { - eTag := a.getETag() +func (a *apiManager) waitForNewCL(w http.ResponseWriter, timeout time.Duration) { + ConfigChangeChan := make(chan interface{}, 1) + a.addSubscriber <- ConfigChangeChan + + log.Debug("Long-polling... Waiting for new Deployments.") + + select { + case LSN := <-ConfigChangeChan: + // send configs and LSN + lsn, ok := LSN.(string) + if !ok { + log.Errorf("Wrong LSN type: %v", LSN) + a.writeInternalError(w, "Wrong LSN type") + return + } + a.sendReadyDeployments("", w, lsn) + case <-time.After(timeout * time.Second): + log.Debug("long-polling configuration request timed out.") + w.WriteHeader(http.StatusNotModified) + } +} + +func (a *apiManager) sendReadyDeployments(typeFilter string, w http.ResponseWriter, apidLSN string) { deployments, err := a.dbMan.getReadyDeployments(typeFilter) if err != nil { + log.Errorf("Database error: %v", err) a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error())) return } - a.sendDeployments(w, deployments, eTag, typeFilter) + a.sendDeployments(w, deployments, apidLSN, typeFilter) } -func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string, typeFilter string) { +func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []Configuration, apidLSN string, typeFilter string) { apiDeps := ApiDeploymentResponse{} apiDepDetails := make([]ApiDeploymentDetails, 0) @@ -389,20 +343,38 @@ return } - log.Debugf("sending deployments %s: %s", eTag, b) - w.Header().Set("ETag", eTag) + log.Debugf("sending deployments %s: %s", apidLSN, b) + if apidLSN != "" { + w.Header().Set(apidConfigIndexHeader, apidLSN) + } + w.Write(b) } -// call whenever the list of deployments changes -func (a *apiManager) incrementETag() string { - e := atomic.AddInt64(&a.eTag, 1) - return strconv.FormatInt(e, 10) -} +func (a *apiManager) compareLSN(headerLSN string) (res int, apidLSN string, err error) { + apidLSN, err = a.dbMan.getLastSequence() + log.Debugf("apidLSN: %v", apidLSN) + if err != nil { + log.Errorf("Error when getLastSequence: %v", err) + return 0, "", err + } + if apidLSN == "" { + log.Errorf("Error when getLastSequence: %v", ErrNoLSN) + return 0, "", ErrNoLSN + } -func (a *apiManager) getETag() string { - e := atomic.LoadInt64(&a.eTag) - return strconv.FormatInt(e, 10) + apidSeq, err := common.ParseSequence(apidLSN) + if err != nil { + log.Errorf("Error when Parse apidLSN Sequence: %v", err) + return 0, "", err + } + headerSeq, err := common.ParseSequence(headerLSN) + if err != nil { + log.Errorf("Error when Parse headerLSN Sequence: %v", err) + return 0, "", err + } + + return apidSeq.Compare(headerSeq), apidLSN, nil } // escape the blobId into url
diff --git a/api_test.go b/api_test.go index 84c1cb5..24a5e95 100644 --- a/api_test.go +++ b/api_test.go
@@ -42,16 +42,16 @@ var _ = BeforeEach(func() { testCount += 1 - dummyDbMan = &dummyDbManager{} + dummyDbMan = &dummyDbManager{ + lsn: "19.1d3e9368.0", + } testApiMan = &apiManager{ dbMan: dummyDbMan, deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount), blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", deploymentIdEndpoint: deploymentsEndpoint + strconv.Itoa(testCount) + "/{configId}", - eTag: int64(testCount * 10), - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}), } testApiMan.InitAPI() time.Sleep(100 * time.Millisecond) @@ -126,7 +126,7 @@ // set test data dep := makeTestDeployment() - dummyDbMan.configurations = make(map[string]*DataDeployment) + dummyDbMan.configurations = make(map[string]*Configuration) dummyDbMan.configurations[typeFilter] = dep detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0]) @@ -159,19 +159,19 @@ // set test data setTestDeployments(dummyDbMan, uri.String()) - // http get res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) - etag := res.Header.Get("etag") - Expect(etag).ShouldNot(BeEmpty()) + lsn := res.Header.Get("x-apid-config-index") + Expect(lsn).ShouldNot(BeEmpty()) // send second request + uri.RawQuery = "apid-config-index=" + lsn + log.Debug(uri.String()) req, err := http.NewRequest("GET", uri.String(), nil) req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", etag) // get response res, err = http.DefaultClient.Do(req) @@ -233,7 +233,7 @@ dep := makeTestDeployment() dep.Created = t dep.Updated = t - dummyDbMan.readyDeployments = []DataDeployment{*dep} + dummyDbMan.readyDeployments = []Configuration{*dep} detail := makeExpectedDetail(dep, uri.String()) detail.Created = isoTime[i] detail.Updated = isoTime[i] @@ -254,30 +254,6 @@ } }) - It("should debounce requests", func(done Done) { - var in = make(chan interface{}) - var out = make(chan []interface{}) - - go testApiMan.debounce(in, out, 3*time.Millisecond) - - go func() { - defer GinkgoRecover() - - received, ok := <-out - Expect(ok).To(BeTrue()) - Expect(len(received)).To(Equal(2)) - - close(in) - received, ok = <-out - Expect(ok).To(BeFalse()) - - close(done) - }() - - in <- "x" - in <- "y" - }) - }) Context("GET /blobs", func() { @@ -319,8 +295,8 @@ //setup test data dummyDbMan.err = nil - dummyDbMan.configurations = make(map[string]*DataDeployment) - expectedConfig := &DataDeployment{ + dummyDbMan.configurations = make(map[string]*Configuration) + expectedConfig := &Configuration{ ID: "3ecd351c-1173-40bf-b830-c194e5ef9038", OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8", EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80", @@ -381,8 +357,8 @@ if data[1] != nil { dummyDbMan.err = data[1].(error) } - dummyDbMan.configurations = make(map[string]*DataDeployment) - dummyDbMan.configurations[data[0].(string)] = &DataDeployment{} + dummyDbMan.configurations = make(map[string]*Configuration) + dummyDbMan.configurations[data[0].(string)] = &Configuration{} // http get uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string) res, err := http.Get(uri.String()) @@ -399,7 +375,7 @@ mathrand.Seed(time.Now().UnixNano()) count := mathrand.Intn(5) + 1 - deployments := make([]DataDeployment, count) + deployments := make([]Configuration, count) details := make([]ApiDeploymentDetails, count) for i := 0; i < count; i++ { @@ -415,8 +391,8 @@ return details } -func makeTestDeployment() *DataDeployment { - dep := &DataDeployment{ +func makeTestDeployment() *Configuration { + dep := &Configuration{ ID: util.GenerateUUID(), OrgID: util.GenerateUUID(), EnvID: util.GenerateUUID(), @@ -434,7 +410,7 @@ return dep } -func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails { +func makeExpectedDetail(dep *Configuration, self string) *ApiDeploymentDetails { detail := &ApiDeploymentDetails{ Self: self + "/" + dep.ID, Name: dep.Name, @@ -453,11 +429,12 @@ type dummyDbManager struct { unreadyBlobIds []string - readyDeployments []DataDeployment + readyDeployments []Configuration localFSLocation string fileResponse chan string version string - configurations map[string]*DataDeployment + configurations map[string]*Configuration + lsn string err error } @@ -473,11 +450,11 @@ return d.unreadyBlobIds, nil } -func (d *dummyDbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) { +func (d *dummyDbManager) getReadyDeployments(typeFilter string) ([]Configuration, error) { if typeFilter == "" { return d.readyDeployments, nil } - return []DataDeployment{*(d.configurations[typeFilter])}, nil + return []Configuration{*(d.configurations[typeFilter])}, nil } func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error { @@ -498,6 +475,9 @@ return d.localFSLocation, nil } -func (d *dummyDbManager) getConfigById(id string) (*DataDeployment, error) { +func (d *dummyDbManager) getConfigById(id string) (*Configuration, error) { return d.configurations[id], d.err } +func (d *dummyDbManager) getLastSequence() (string, error) { + return d.lsn, nil +}
diff --git a/bundle.go b/bundle.go index b4729cf..d1fb8bd 100644 --- a/bundle.go +++ b/bundle.go
@@ -34,10 +34,10 @@ type bundleManagerInterface interface { initializeBundleDownloading() - queueDownloadRequest(*DataDeployment) + downloadBlobsForChangeList(configs []*Configuration, LSN string) enqueueRequest(*DownloadRequest) - makeDownloadRequest(string) *DownloadRequest - deleteBundlesFromDeployments([]DataDeployment) + makeDownloadRequest(blobId string, changelistRequest *ChangeListDownloadRequest) *DownloadRequest + deleteBlobsFromConfigs([]*Configuration) deleteBundleById(string) Close() } @@ -82,28 +82,34 @@ // download bundle blob and resource blob // TODO do not download duplicate blobs -func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { - blobReq := bm.makeDownloadRequest(dep.BlobID) - resourceReq := bm.makeDownloadRequest(dep.BlobResourceID) +func (bm *bundleManager) queueDownloadRequest(conf *Configuration, changelistRequest *ChangeListDownloadRequest) { + blobReq := bm.makeDownloadRequest(conf.BlobID, changelistRequest) + resourceReq := bm.makeDownloadRequest(conf.BlobResourceID, changelistRequest) - go func() { - bm.enqueueRequest(blobReq) - bm.enqueueRequest(resourceReq) - }() + if blobReq != nil { + go bm.enqueueRequest(blobReq) + } + if resourceReq != nil { + go bm.enqueueRequest(resourceReq) + } } -func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest { +func (bm *bundleManager) makeDownloadRequest(blobId string, changelistRequest *ChangeListDownloadRequest) *DownloadRequest { + if blobId == "" { + return nil + } markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute return &DownloadRequest{ - blobServerURL: bm.blobServerUrl, - bm: bm, - blobId: id, - backoffFunc: createBackoff(retryIn, maxBackOff), - markFailedAt: markFailedAt, - client: bm.client, + blobServerURL: bm.blobServerUrl, + bm: bm, + blobId: blobId, + backoffFunc: createBackoff(retryIn, maxBackOff), + markFailedAt: markFailedAt, + client: bm.client, + changelistRequest: changelistRequest, } } @@ -112,40 +118,29 @@ if atomic.LoadInt32(bm.isClosed) == 1 { return } - /* - defer func() { - if r := recover(); r != nil { - log.Warn("trying to enque requests to closed bundleManager") - } - }() - */ bm.downloadQueue <- r } +func (bm *bundleManager) downloadBlobsForChangeList(configs []*Configuration, LSN string) { + c := &ChangeListDownloadRequest{ + bm: bm, + configs: configs, + attemptCounter: new(int32), + LSN: LSN, + } + c.download() +} + func (bm *bundleManager) Close() { atomic.StoreInt32(bm.isClosed, 1) close(bm.downloadQueue) } -func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) { - for _, dep := range deletedDeployments { - go bm.deleteBundleById(dep.BlobID) - go bm.deleteBundleById(dep.BlobResourceID) +func (bm *bundleManager) deleteBlobsFromConfigs(deletedConfigs []*Configuration) { + for _, conf := range deletedConfigs { + go bm.deleteBundleById(conf.BlobID) + go bm.deleteBundleById(conf.BlobResourceID) } - - /* - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - go func() { - // give clients a minute to avoid conflicts - time.Sleep(bm.bundleCleanupDelay) - for _, dep := range deletedDeployments { - bundleFile := getBlobFilePath(dep.BlobID) - log.Debugf("removing old bundle: %v", bundleFile) - // TODO Remove from the Database table apid_blob_available - safeDelete(bundleFile) - } - }() - */ } // TODO add delete support @@ -153,19 +148,62 @@ } +type ChangeListDownloadRequest struct { + bm *bundleManager + configs []*Configuration + attemptCounter *int32 + LSN string +} + +func (cldr *ChangeListDownloadRequest) download() { + log.Debug("Attempt to download blobs for change list: %v", cldr.LSN) + + if len(cldr.configs) == 0 { // If there are no new configurations in this CL + log.Debug("No new configs for change list: %v, expose immediately", cldr.LSN) + cldr.exposeChangeList() + return + } + + *cldr.attemptCounter = 0 + for _, c := range cldr.configs { + if c.BlobID != "" { + *cldr.attemptCounter++ + } + if c.BlobResourceID != "" { + *cldr.attemptCounter++ + } + } + for _, c := range cldr.configs { + cldr.bm.queueDownloadRequest(c, cldr) + } +} + +func (cldr *ChangeListDownloadRequest) downloadAttempted() { + if atomic.AddInt32(cldr.attemptCounter, -1) == 0 { + cldr.exposeChangeList() + } +} + +func (cldr *ChangeListDownloadRequest) exposeChangeList() { + go cldr.bm.apiMan.notifyNewChangeList(cldr.LSN) +} + type DownloadRequest struct { - bm *bundleManager - blobId string - backoffFunc func() - markFailedAt time.Time - blobServerURL string - client *http.Client + bm *bundleManager + blobId string + backoffFunc func() + markFailedAt time.Time + blobServerURL string + client *http.Client + changelistRequest *ChangeListDownloadRequest + attempted bool } func (r *DownloadRequest) downloadBundle() error { log.Debugf("starting bundle download attempt for blobId=%s", r.blobId) - + var err error + defer r.markAttempted(&err) if r.checkTimeout() { return &timeoutError{ markFailedAt: r.markFailedAt, @@ -199,10 +237,7 @@ return err } - log.Debugf("bundle downloaded: blobId=%s filename=%s", r.blobId, downloadedFile) - - // TODO send changed deployments to subscribers (API call with "block") - //r.bm.apiMan.addChangedDeployment(dep.ID) + log.Debugf("blod downloaded and inserted: blobId=%s filename=%s", r.blobId, downloadedFile) return nil } @@ -218,6 +253,19 @@ return false } +func (r *DownloadRequest) markAttempted(errp *error) { + if !r.attempted { + r.attempted = true + err := *errp + if r.changelistRequest != nil { + r.changelistRequest.downloadAttempted() + } + if err != nil { + //TODO: insert to DB as "attempted but unsuccessful" + } + } +} + func getBlobFilePath(blobId string) string { return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId))) }
diff --git a/bundle_test.go b/bundle_test.go index 30760cc..8b79358 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -98,7 +98,7 @@ It("should download blob according to id", func() { // download blob id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) received := <-dummyDbMan.fileResponse Expect(received).Should(Equal(id)) }) @@ -112,7 +112,7 @@ // download blobs id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) received := <-dummyDbMan.fileResponse Expect(received).Should(Equal(id)) @@ -128,7 +128,7 @@ // download blobs id := util.GenerateUUID() - req := testBundleMan.makeDownloadRequest(id) + req := testBundleMan.makeDownloadRequest(id, nil) Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) testBundleMan.enqueueRequest(req) @@ -140,12 +140,17 @@ type dummyApiManager struct { initCalled bool + LSN string } func (a *dummyApiManager) InitAPI() { a.initCalled = true } +func (a *dummyApiManager) notifyNewChangeList(newLSN string) { + a.LSN = newLSN +} + type dummyBlobServer struct { serverEndpoint string signedEndpoint string
diff --git a/data.go b/data.go index d424e2b..06fe87c 100644 --- a/data.go +++ b/data.go
@@ -25,7 +25,7 @@ gwBlobId int64 ) -type DataDeployment struct { +type Configuration struct { ID string OrgID string EnvID string @@ -49,10 +49,11 @@ setDbVersion(string) initDb() error getUnreadyBlobs() ([]string, error) - getReadyDeployments(typeFilter string) ([]DataDeployment, error) + getReadyDeployments(typeFilter string) ([]Configuration, error) updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) - getConfigById(string) (*DataDeployment, error) + getConfigById(string) (*Configuration, error) + getLastSequence() (string, error) } type dbManager struct { @@ -100,7 +101,7 @@ return nil } -func (dbc *dbManager) getConfigById(id string) (config *DataDeployment, err error) { +func (dbc *dbManager) getConfigById(id string) (config *Configuration, err error) { row := dbc.getDb().QueryRow(` SELECT a.id, a.organization_id, @@ -161,7 +162,7 @@ return } -func (dbc *dbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) { +func (dbc *dbManager) getReadyDeployments(typeFilter string) ([]Configuration, error) { // An alternative statement is in get_ready_deployments.sql // Need testing with large data volume to determine which is better @@ -317,12 +318,27 @@ return } -func dataDeploymentsFromRows(rows *sql.Rows) ([]DataDeployment, error) { - tmp, err := structFromRows(reflect.TypeOf((*DataDeployment)(nil)).Elem(), rows) +func (dbc *dbManager) getLastSequence() (string, error) { + var lastSequence sql.NullString + err := dbc.getDb().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) + if err != nil && err != sql.ErrNoRows { + log.Errorf("Failed to select last_sequence from EDGEX_APID_CLUSTER: %v", err) + return "", err + } + ret := "" + if lastSequence.Valid { + ret = lastSequence.String + } + log.Debugf("lastSequence: %s", lastSequence) + return ret, nil +} + +func dataDeploymentsFromRows(rows *sql.Rows) ([]Configuration, error) { + tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows) if err != nil { return nil, err } - return tmp.([]DataDeployment), nil + return tmp.([]Configuration), nil } func structFromRows(t reflect.Type, rows *sql.Rows) (interface{}, error) { @@ -349,15 +365,15 @@ return slice.Interface(), nil } -func dataDeploymentsFromRow(row *sql.Row) (*DataDeployment, error) { - tmp, err := structFromRow(reflect.TypeOf((*DataDeployment)(nil)).Elem(), row) +func dataDeploymentsFromRow(row *sql.Row) (*Configuration, error) { + tmp, err := structFromRow(reflect.TypeOf((*Configuration)(nil)).Elem(), row) if err != nil { if err != sql.ErrNoRows { log.Errorf("Error in dataDeploymentsFromRow: %v", err) } return nil, err } - config := tmp.(DataDeployment) + config := tmp.(Configuration) return &config, nil }
diff --git a/data_test.go b/data_test.go index fbbb0a7..50b2472 100644 --- a/data_test.go +++ b/data_test.go
@@ -127,7 +127,7 @@ It("should get configuration by Id", func() { config, err := testDbMan.getConfigById("3ecd351c-1173-40bf-b830-c194e5ef9038") Expect(err).Should(Succeed()) - expectedResponse := &DataDeployment{ + expectedResponse := &Configuration{ ID: "3ecd351c-1173-40bf-b830-c194e5ef9038", OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8", EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80",
diff --git a/init.go b/init.go index 93e67ce..d04b74c 100644 --- a/init.go +++ b/init.go
@@ -137,10 +137,8 @@ deploymentsEndpoint: deploymentsEndpoint, blobEndpoint: blobEndpoint, deploymentIdEndpoint: deploymentIdEndpoint, - eTag: 0, - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}, 100), apiInitialized: false, }
diff --git a/listener.go b/listener.go index 2769568..51304b4 100644 --- a/listener.go +++ b/listener.go
@@ -76,7 +76,6 @@ log.Debug("Snapshot processed") } -// TODO make it work with new schema func (h *apigeeSyncHandler) startupOnExistingDatabase() { // start bundle downloads that didn't finish go func() { @@ -90,7 +89,7 @@ log.Debugf("Queuing %d blob downloads", len(blobIds)) for _, id := range blobIds { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id)) + go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id, nil)) } }() } @@ -98,67 +97,41 @@ func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) { log.Debugf("Processing changes") - // changes have been applied to DB - var insertedDeployments, deletedDeployments []DataDeployment - var updatedNewBlobs, updatedOldBlobs []string + // changes have been applied to DB by apidApigeeSync + var insertedConfigs, updatedNewConfigs, updatedOldConfigs, deletedConfigs []*Configuration for _, change := range changes.Changes { switch change.Table { case CONFIG_METADATA_TABLE: switch change.Operation { case common.Insert: dep := dataDeploymentFromRow(change.NewRow) - insertedDeployments = append(insertedDeployments, dep) + insertedConfigs = append(insertedConfigs, &dep) case common.Delete: dep := dataDeploymentFromRow(change.OldRow) - deletedDeployments = append(deletedDeployments, dep) + deletedConfigs = append(deletedConfigs, &dep) case common.Update: depNew := dataDeploymentFromRow(change.NewRow) depOld := dataDeploymentFromRow(change.OldRow) - - if depOld.BlobID != depNew.BlobID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID) - } - - if depOld.BlobResourceID != depNew.BlobResourceID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID) - } + updatedNewConfigs = append(updatedNewConfigs, &depNew) + updatedOldConfigs = append(updatedOldConfigs, &depOld) default: log.Errorf("unexpected operation: %s", change.Operation) } } } - - /* - for _, d := range deletedDeployments { - h.apiMan.addChangedDeployment(d.ID) - } - */ - - // insert - for i := range insertedDeployments { - go h.bundleMan.queueDownloadRequest(&insertedDeployments[i]) + // deleted old configs + if len(deletedConfigs)+len(updatedOldConfigs) > 0 { + log.Debugf("will delete %d old blobs", len(deletedConfigs)+len(updatedOldConfigs)) + //TODO delete blobs for deleted configs + go h.bundleMan.deleteBlobsFromConfigs(append(deletedConfigs, updatedOldConfigs...)) } - // update - for i := range updatedNewBlobs { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i])) - } + // new configs + h.bundleMan.downloadBlobsForChangeList(append(insertedConfigs, updatedNewConfigs...), changes.LastSequence) - for i := range updatedOldBlobs { - go h.bundleMan.deleteBundleById(updatedOldBlobs[i]) - } - - // delete - if len(deletedDeployments) > 0 { - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - //TODO delete bundles for deleted deployments - h.bundleMan.deleteBundlesFromDeployments(deletedDeployments) - } } -func dataDeploymentFromRow(row common.Row) (d DataDeployment) { +func dataDeploymentFromRow(row common.Row) (d Configuration) { row.Get("id", &d.ID) row.Get("organization_id", &d.OrgID)
diff --git a/listener_test.go b/listener_test.go index 55a71c5..41f68d3 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -39,9 +39,8 @@ dummyDbMan = &dummyDbManager{} dummyBundleMan = &dummyBundleManager{ requestChan: make(chan *DownloadRequest), - depChan: make(chan *DataDeployment), - delChan: make(chan *DataDeployment), - delBlobChan: make(chan string), + depChan: make(chan *Configuration), + delChan: make(chan *Configuration), } testHandler = &apigeeSyncHandler{ dbMan: dummyDbMan, @@ -109,7 +108,7 @@ It("Insert event should enqueue download requests for all inserted deployments", func() { // emit change event changes := make([]common.Change, 0) - deployments := make(map[string]DataDeployment) + deployments := make(map[string]Configuration) for i := 0; i < 1+rand.Intn(10); i++ { dep := makeTestDeployment() change := common.Change{ @@ -169,155 +168,100 @@ It("Update event should enqueue download requests and delete old blobs", func() { changes := make([]common.Change, 0) - blobIdNew := make(map[string]int) - blobIdOld := make(map[string]int) + configsNew := make(map[string]Configuration) + configsOld := make(map[string]Configuration) for i := 0; i < 1+rand.Intn(10); i++ { - depNew := makeTestDeployment() - depNew.BlobID = util.GenerateUUID() - depNew.BlobResourceID = util.GenerateUUID() + confNew := makeTestDeployment() + confNew.BlobID = util.GenerateUUID() + confNew.BlobResourceID = util.GenerateUUID() - depOld := makeTestDeployment() - depOld.BlobID = util.GenerateUUID() - depOld.BlobResourceID = util.GenerateUUID() + confOld := makeTestDeployment() + confOld.BlobID = util.GenerateUUID() + confOld.BlobResourceID = util.GenerateUUID() change := common.Change{ Operation: common.Update, Table: CONFIG_METADATA_TABLE, - NewRow: rowFromDeployment(depNew), - OldRow: rowFromDeployment(depOld), + NewRow: rowFromDeployment(confNew), + OldRow: rowFromDeployment(confOld), } changes = append(changes, change) - blobIdNew[depNew.BlobID]++ - blobIdNew[depNew.BlobResourceID]++ - blobIdOld[depOld.BlobID]++ - blobIdOld[depOld.BlobResourceID]++ + configsNew[confNew.ID] = *confNew + configsOld[confOld.ID] = *confOld } + testLSN := "1.1.1" // emit change event changeList := &common.ChangeList{ - Changes: changes, + Changes: changes, + LastSequence: testLSN, } apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify - for i := 0; i < len(blobIdNew); i++ { - req := <-dummyBundleMan.requestChan - blobIdNew[req.blobId]++ - Expect(blobIdNew[req.blobId]).Should(Equal(2)) + for i := 0; i < len(configsNew); i++ { + conf := <-dummyBundleMan.depChan + Expect(reflect.DeepEqual(configsNew[conf.ID], *conf)).Should(BeTrue()) + delete(configsNew, conf.ID) } - for i := 0; i < len(blobIdOld); i++ { - blobId := <-dummyBundleMan.delBlobChan - blobIdOld[blobId]++ - Expect(blobIdOld[blobId]).Should(Equal(2)) + for i := 0; i < len(configsOld); i++ { + conf := <-dummyBundleMan.delChan + Expect(reflect.DeepEqual(configsOld[conf.ID], *conf)).Should(BeTrue()) + delete(configsOld, conf.ID) } }) - It("Update event should only download/delete changed blobs", func() { - changes := make([]common.Change, 0) - blobIdChangedNew := make(map[string]int) - blobIdChangedOld := make(map[string]int) - - for i := 0; i < 1+rand.Intn(10); i++ { - depNew := makeTestDeployment() - depNew.BlobID = util.GenerateUUID() - depNew.BlobResourceID = util.GenerateUUID() - - depOld := makeTestDeployment() - - if rand.Intn(2) == 0 { - // blob id changed - depOld.BlobID = util.GenerateUUID() - blobIdChangedNew[depNew.BlobID]++ - blobIdChangedOld[depOld.BlobID]++ - } else { - // blob id unchanged - depOld.BlobID = depNew.BlobID - } - - if rand.Intn(2) == 0 { - // blob id changed - depOld.BlobResourceID = util.GenerateUUID() - blobIdChangedNew[depNew.BlobResourceID]++ - blobIdChangedOld[depOld.BlobResourceID]++ - } else { - // blob id unchanged - depOld.BlobResourceID = depNew.BlobResourceID - } - - change := common.Change{ - Operation: common.Update, - Table: CONFIG_METADATA_TABLE, - NewRow: rowFromDeployment(depNew), - OldRow: rowFromDeployment(depOld), - } - changes = append(changes, change) - } - - // emit change event - changeList := &common.ChangeList{ - Changes: changes, - } - - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) - - // verify - for i := 0; i < len(blobIdChangedNew); i++ { - req := <-dummyBundleMan.requestChan - blobIdChangedNew[req.blobId]++ - Expect(blobIdChangedNew[req.blobId]).Should(Equal(2)) - } - for i := 0; i < len(blobIdChangedOld); i++ { - blobId := <-dummyBundleMan.delBlobChan - blobIdChangedOld[blobId]++ - Expect(blobIdChangedOld[blobId]).Should(Equal(2)) - } - }) }) }) type dummyBundleManager struct { requestChan chan *DownloadRequest - depChan chan *DataDeployment - delChan chan *DataDeployment - delBlobChan chan string + depChan chan *Configuration + delChan chan *Configuration + LSN string } func (bm *dummyBundleManager) initializeBundleDownloading() { } -func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) { - bm.depChan <- dep +func (bm *dummyBundleManager) downloadBlobsForChangeList(configs []*Configuration, LSN string) { + bm.LSN = LSN + for _, conf := range configs { + bm.depChan <- conf + } + } func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) { bm.requestChan <- req } -func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest { +func (bm *dummyBundleManager) makeDownloadRequest(blobId string, changelistRequest *ChangeListDownloadRequest) *DownloadRequest { return &DownloadRequest{ - blobId: blobId, + blobId: blobId, + changelistRequest: changelistRequest, } } -func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) { +func (bm *dummyBundleManager) deleteBlobsFromConfigs(deployments []*Configuration) { for i := range deployments { - bm.delChan <- &deployments[i] + bm.delChan <- deployments[i] } } func (bm *dummyBundleManager) deleteBundleById(blobId string) { - bm.delBlobChan <- blobId + } func (bm *dummyBundleManager) Close() { } -func rowFromDeployment(dep *DataDeployment) common.Row { +func rowFromDeployment(dep *Configuration) common.Row { row := common.Row{} row["id"] = &common.ColumnVal{Value: dep.ID} row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
diff --git a/longPoll.go b/longPoll.go new file mode 100644 index 0000000..98d61ee --- /dev/null +++ b/longPoll.go
@@ -0,0 +1,59 @@ +package apiGatewayConfDeploy + +import "time" + +// distributeEvents() receives elements from deliverChan, and send them to subscribers +// Sending a `chan interface{}` to addSubscriber adds a new subscriber. +// It closes the subscriber channel after sending the element. +// Any subscriber sent to `addSubscriber` should be buffered chan. +func distributeEvents(deliverChan <-chan interface{}, addSubscriber chan chan interface{}) { + subscribers := make([]chan interface{}, 0) + for { + select { + case element, ok := <-deliverChan: + if !ok { + return + } + for _, subscriber := range subscribers { + go func(sub chan interface{}) { + log.Debugf("delivering to: %v", sub) + sub <- element + close(sub) + }(subscriber) + } + subscribers = make([]chan interface{}, 0) + case sub, ok := <-addSubscriber: + if !ok { + return + } + log.Debugf("Add subscriber: %v", sub) + subscribers = append(subscribers, sub) + } + } +} + +func debounce(in chan interface{}, out chan interface{}, window time.Duration) { + send := func(toSend interface{}) { + if toSend != nil { + out <- toSend + } + } + var toSend interface{} = nil + for { + select { + case incoming, ok := <-in: + if ok { + log.Debugf("debouncing %v", incoming) + toSend = incoming + } else { + send(toSend) + log.Debugf("closing debouncer") + close(out) + return + } + case <-time.After(window): + send(toSend) + toSend = nil + } + } +}