WIP: [ISSUE-66918282] support long poll for "/configurations" (#23) * [ISSUE-66918282] support long poll for "/configurations" * [ISSUE-66918282] add tests * [ISSUE-66918282] fix bugs for long-polling * [ISSUE-66918282] Read DB only once for all subscribers * [ISSUE-66918282] use long-poll in apid-core, improve style * [ISSUE-66918282] * [ISSUE-66918282] update tests, improve style * [ISSUE-66918282] refactor tests * [ISSUE-66918282] update tests * [ISSUE-66918282] update README and swagger
diff --git a/README.md b/README.md index dd683e9..f9c5381 100644 --- a/README.md +++ b/README.md
@@ -11,5 +11,15 @@ ## Functional description -see the file [swagger.yaml](swagger.yaml). +###Configurations +* Gateway cant call "/configurations" to fetch configurations. +* "type" filter is supported. +* Long-polling is supported. +* A configuration can be fetched by id "/configurations/{configId}" + +###Blobs +* A blob can be downloaded by id "/blobs/{blobId}" + + +For details, check the file [apidGatewayConfDeploy-api.yaml](swagger.yaml).
diff --git a/api.go b/api.go index 4ffc12e..5f5ab0e 100644 --- a/api.go +++ b/api.go
@@ -17,14 +17,16 @@ "bytes" "database/sql" "encoding/json" + "errors" "fmt" + "github.com/apid/apid-core/util" + "github.com/apigee-labs/transicator/common" "github.com/gorilla/mux" "io" "io/ioutil" "net/http" "net/url" "strconv" - "sync/atomic" "time" ) @@ -39,10 +41,10 @@ ) const ( - deploymentsEndpoint = "/configurations" - blobEndpointPath = "/blobs" - blobEndpoint = blobEndpointPath + "/{blobId}" - deploymentIdEndpoint = deploymentsEndpoint + "/{configId}" + configEndpoint = "/configurations" + blobEndpointPath = "/blobs" + blobEndpoint = blobEndpointPath + "/{blobId}" + configIdEndpoint = configEndpoint + "/{configId}" ) const ( @@ -64,11 +66,19 @@ ) const ( - headerSteam = "application/octet-stream" + headerSteam = "application/octet-stream" + headerJson = "application/json" + apidConfigIndexPar = "apid-config-index" + apidConfigIndexHeader = "x-apid-config-index" +) + +var ( + ErrNoLSN = errors.New("No last sequence in DB") + ErrInvalidLSN = errors.New(apidConfigIndexPar + " is invalid") ) type deploymentsResult struct { - deployments []DataDeployment + deployments []Configuration err error eTag string } @@ -78,7 +88,7 @@ Reason string `json:"reason"` } -type ApiDeploymentDetails struct { +type ApiConfigurationDetails struct { Self string `json:"self"` Name string `json:"name"` Type string `json:"type"` @@ -92,44 +102,60 @@ Updated string `json:"updated"` } -type ApiDeploymentResponse struct { - Kind string `json:"kind"` - Self string `json:"self"` - ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"` +type ApiConfigurationResponse struct { + Kind string `json:"kind"` + Self string `json:"self"` + ApiConfigurationsResponse []ApiConfigurationDetails `json:"contents"` } -//TODO add support for block and subscriber +type confChangeNotification struct { + LSN string + confs []Configuration + err error +} + type apiManagerInterface interface { + // an idempotent method to initialize api endpoints InitAPI() - //addChangedDeployment(string) - //distributeEvents() + notifyNewChange() } type apiManager struct { - dbMan dbManagerInterface - deploymentsEndpoint string - blobEndpoint string - deploymentIdEndpoint string - eTag int64 - deploymentsChanged chan interface{} - addSubscriber chan chan deploymentsResult - removeSubscriber chan chan deploymentsResult - apiInitialized bool + dbMan dbManagerInterface + configurationEndpoint string + blobEndpoint string + configurationIdEndpoint string + addSubscriber chan chan interface{} + newChangeListChan chan interface{} + apiInitialized bool } func (a *apiManager) InitAPI() { if a.apiInitialized { return } - services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(a.configurationEndpoint, a.apiGetCurrentConfigs).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") - services.API().HandleFunc(a.deploymentIdEndpoint, a.apiHandleConfigId).Methods("GET") + services.API().HandleFunc(a.configurationIdEndpoint, a.apiHandleConfigId).Methods("GET") + a.initDistributeEvents() a.apiInitialized = true log.Debug("API endpoints initialized") } -func (a *apiManager) addChangedDeployment(id string) { - a.deploymentsChanged <- id +func (a *apiManager) initDistributeEvents() { + go util.DistributeEvents(a.newChangeListChan, a.addSubscriber) +} + +func (a *apiManager) notifyNewChange() { + confs, err := a.dbMan.getAllConfigurations("") + if err != nil { + log.Errorf("Database error in getReadyConfigurations: %v", err) + } + a.newChangeListChan <- &confChangeNotification{ + LSN: a.dbMan.getLSN(), + confs: confs, + err: err, + } } func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) { @@ -151,70 +177,6 @@ a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err) } -func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) { - send := func(toSend []interface{}) { - if toSend != nil { - log.Debugf("debouncer sending: %v", toSend) - out <- toSend - } - } - var toSend []interface{} - for { - select { - case incoming, ok := <-in: - if ok { - log.Debugf("debouncing %v", incoming) - toSend = append(toSend, incoming) - } else { - send(toSend) - log.Debugf("closing debouncer") - close(out) - return - } - case <-time.After(window): - send(toSend) - toSend = nil - } - } -} - -//TODO get notified when deployments ready -/* -func (a *apiManager) distributeEvents() { - subscribers := make(map[chan deploymentsResult]bool) - deliverDeployments := make(chan []interface{}, 1) - - go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration) - - for { - select { - case _, ok := <-deliverDeployments: - if !ok { - return // todo: using this? - } - subs := subscribers - subscribers = make(map[chan deploymentsResult]bool) - go func() { - eTag := a.incrementETag() - deployments, err := a.dbMan.getUnreadyDeployments() - log.Debugf("delivering deployments to %d subscribers", len(subs)) - for subscriber := range subs { - log.Debugf("delivering to: %v", subscriber) - subscriber <- deploymentsResult{deployments, err, eTag} - } - }() - case subscriber := <-a.addSubscriber: - log.Debugf("Add subscriber: %v", subscriber) - subscribers[subscriber] = true - case subscriber := <-a.removeSubscriber: - log.Debugf("Remove subscriber: %v", subscriber) - delete(subscribers, subscriber) - } - } -} -*/ - -// TODO use If-None-Match and ETag func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) @@ -250,8 +212,8 @@ } return } - configDetail := ApiDeploymentDetails{ - Self: getHttpHost() + a.deploymentsEndpoint + "/" + config.ID, + configDetail := ApiConfigurationDetails{ + Self: getHttpHost() + a.configurationEndpoint + "/" + config.ID, Name: config.Name, Type: config.Type, Revision: config.Revision, @@ -271,99 +233,99 @@ return } log.Debugf("sending configuration %s", b) + w.Header().Set("Content-Type", headerJson) w.Write(b) } -func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { - - // If returning without a bundle (immediately or after timeout), status = 404 - // If returning If-None-Match value is equal to current deployment, status = 304 - // If returning a new value, status = 200 - - // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then - // block for up to the specified number of seconds until a new deployment becomes available. - b := r.URL.Query().Get("block") +// If not long-polling, return configurations, status = 200 +// If "apid-config-index" is given in request parameters, return immediately with status = 200/304 +// If both "block" and "apid-config-index" are given: +// if apid's LSN > apid-config-index in header, return immediately with status = 200 +// if apid's LSN <= apid-config-index, long polling for timeout=block secs +func (a *apiManager) apiGetCurrentConfigs(w http.ResponseWriter, r *http.Request) { + blockSec := r.URL.Query().Get("block") typeFilter := r.URL.Query().Get("type") + headerLSN := r.URL.Query().Get(apidConfigIndexPar) var timeout int - if b != "" { - var err error - timeout, err = strconv.Atoi(b) - if err != nil { + var err error + if blockSec != "" { + timeout, err = strconv.Atoi(blockSec) + if err != nil || timeout < 0 { a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds") return } } - log.Debugf("api timeout: %d", timeout) + log.Debugf("/configurations long-poll timeout: %d", timeout) - // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block' - // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the - // most recent bundle list. - ifNoneMatch := r.Header.Get("If-None-Match") - log.Debugf("if-none-match: %s", ifNoneMatch) + log.Debugf("Long-Poll-Index: %s", headerLSN) - // send unmodified if matches prior eTag and no timeout - eTag := a.getETag() - if eTag == ifNoneMatch && timeout == 0 { - w.WriteHeader(http.StatusNotModified) + // if filter by "type" + if typeFilter != "" { + a.sendReadyConfigurations(typeFilter, w, "") return } - // send results if different eTag - if eTag != ifNoneMatch { - a.sendReadyDeployments(typeFilter, w) - return - } - - // otherwise, subscribe to any new deployment changes - var newDeploymentsChannel chan deploymentsResult - if timeout > 0 && ifNoneMatch != "" { - //TODO handle block - //newDeploymentsChannel = make(chan deploymentsResult, 1) - //a.addSubscriber <- newDeploymentsChannel - } - - log.Debug("Blocking request... Waiting for new Deployments.") - - select { - case result := <-newDeploymentsChannel: - if result.err != nil { - a.writeInternalError(w, "Database error") - } else { - a.sendDeployments(w, result.deployments, result.eTag, typeFilter) + // if no filter, check for long polling + cmpRes, apidLSN, err := a.compareLSN(headerLSN) + switch { + case err != nil: + if err == ErrInvalidLSN { + a.writeError(w, http.StatusBadRequest, http.StatusBadRequest, err.Error()) + return } - - case <-time.After(time.Duration(timeout) * time.Second): - a.removeSubscriber <- newDeploymentsChannel - log.Debug("Blocking deployment request timed out.") - if ifNoneMatch != "" { + log.Errorf("Error in compareLSN: %v", err) + a.writeInternalError(w, err.Error()) + return + case cmpRes <= 0: //APID_LSN <= Header_LSN + if timeout == 0 { // no long polling w.WriteHeader(http.StatusNotModified) - } else { - a.sendReadyDeployments(typeFilter, w) + } else { // long polling + util.LongPolling(w, time.Duration(timeout)*time.Second, a.addSubscriber, a.LongPollSuccessHandler, a.LongPollTimeoutHandler) } + return + case cmpRes > 0: //APID_LSN > Header_LSN + a.sendReadyConfigurations("", w, apidLSN) + return } } -func (a *apiManager) sendReadyDeployments(typeFilter string, w http.ResponseWriter) { - eTag := a.getETag() - deployments, err := a.dbMan.getReadyDeployments(typeFilter) +func (a *apiManager) LongPollSuccessHandler(c interface{}, w http.ResponseWriter) { + // send configs and LSN + confChange, ok := c.(*confChangeNotification) + if !ok || confChange.err != nil { + log.Errorf("Wrong confChangeNotification: %v, %v", ok, confChange) + a.writeInternalError(w, "Error getting configurations with long-polling") + return + } + a.sendDeployments(w, confChange.confs, confChange.LSN, "") +} + +func (a *apiManager) LongPollTimeoutHandler(w http.ResponseWriter) { + log.Debug("long-polling configuration request timed out.") + w.WriteHeader(http.StatusNotModified) +} + +func (a *apiManager) sendReadyConfigurations(typeFilter string, w http.ResponseWriter, apidLSN string) { + configurations, err := a.dbMan.getAllConfigurations(typeFilter) if err != nil { + log.Errorf("Database error: %v", err) a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error())) return } - a.sendDeployments(w, deployments, eTag, typeFilter) + a.sendDeployments(w, configurations, apidLSN, typeFilter) } -func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string, typeFilter string) { +func (a *apiManager) sendDeployments(w http.ResponseWriter, dataConfs []Configuration, apidLSN string, typeFilter string) { - apiDeps := ApiDeploymentResponse{} - apiDepDetails := make([]ApiDeploymentDetails, 0) + apiConfs := ApiConfigurationResponse{} + apiConfDetails := make([]ApiConfigurationDetails, 0) - apiDeps.Kind = kindCollection - apiDeps.Self = getHttpHost() + a.deploymentsEndpoint + apiConfs.Kind = kindCollection + apiConfs.Self = getHttpHost() + a.configurationEndpoint - for _, d := range dataDeps { - apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ - Self: apiDeps.Self + "/" + d.ID, + for _, d := range dataConfs { + apiConfDetails = append(apiConfDetails, ApiConfigurationDetails{ + Self: apiConfs.Self + "/" + d.ID, Name: d.Name, Type: d.Type, Revision: d.Revision, @@ -376,33 +338,49 @@ Updated: convertTime(d.Updated), }) } - apiDeps.ApiDeploymentsResponse = apiDepDetails + apiConfs.ApiConfigurationsResponse = apiConfDetails if typeFilter != "" { - apiDeps.Self += "?type=" + typeFilter + apiConfs.Self += "?type=" + typeFilter } - b, err := json.Marshal(apiDeps) + b, err := json.Marshal(apiConfs) if err != nil { log.Errorf("unable to marshal deployments: %v", err) w.WriteHeader(http.StatusInternalServerError) return } - log.Debugf("sending deployments %s: %s", eTag, b) - w.Header().Set("ETag", eTag) + if apidLSN != "" { + w.Header().Set(apidConfigIndexHeader, apidLSN) + } + w.Header().Set("Content-Type", headerJson) + log.Debugf("sending deployments %s", apidLSN) w.Write(b) } -// call whenever the list of deployments changes -func (a *apiManager) incrementETag() string { - e := atomic.AddInt64(&a.eTag, 1) - return strconv.FormatInt(e, 10) -} +func (a *apiManager) compareLSN(headerLSN string) (res int, apidLSN string, err error) { + apidLSN = a.dbMan.getLSN() + log.Debugf("apidLSN: %v", apidLSN) -func (a *apiManager) getETag() string { - e := atomic.LoadInt64(&a.eTag) - return strconv.FormatInt(e, 10) + // if no Long Poll Index + if headerLSN == "" { + return 1, apidLSN, nil + } + + headerSeq, err := common.ParseSequence(headerLSN) + if err != nil { + log.Debugf("Error when Parse headerLSN Sequence: %v", err) + return 0, "", ErrInvalidLSN + } + + apidSeq, err := common.ParseSequence(apidLSN) + if err != nil { + log.Errorf("Error when Parse apidLSN Sequence: %v", err) + return 0, "", err + } + + return apidSeq.Compare(headerSeq), apidLSN, nil } // escape the blobId into url @@ -430,7 +408,7 @@ func getHttpHost() string { - configuredEndpoint := config.GetString(configBundleBlobDownloadEndpoint) + configuredEndpoint := config.GetString(configBlobDownloadEndpoint) if configuredEndpoint != "" { return configuredEndpoint }
diff --git a/api_test.go b/api_test.go index 84c1cb5..630d993 100644 --- a/api_test.go +++ b/api_test.go
@@ -24,7 +24,6 @@ mathrand "math/rand" "net/http" "net/url" - "os" "strconv" "strings" "time" @@ -42,16 +41,16 @@ var _ = BeforeEach(func() { testCount += 1 - dummyDbMan = &dummyDbManager{} + dummyDbMan = &dummyDbManager{ + lsn: "0.1.1", + } testApiMan = &apiManager{ - dbMan: dummyDbMan, - deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount), - blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", - deploymentIdEndpoint: deploymentsEndpoint + strconv.Itoa(testCount) + "/{configId}", - eTag: int64(testCount * 10), - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), + dbMan: dummyDbMan, + configurationEndpoint: configEndpoint + strconv.Itoa(testCount), + blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", + configurationIdEndpoint: configEndpoint + strconv.Itoa(testCount) + "/{configId}", + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}), } testApiMan.InitAPI() time.Sleep(100 * time.Millisecond) @@ -65,7 +64,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) // http get res, err := http.Get(uri.String()) @@ -74,16 +73,16 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) Expect(err).Should(Succeed()) // verify response - Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) + Expect(len(depRes.ApiConfigurationsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(apiTestUrl + configEndpoint + strconv.Itoa(testCount))) }) @@ -91,7 +90,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) // set test data details := setTestDeployments(dummyDbMan, uri.String()) @@ -103,7 +102,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -112,7 +111,7 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(uri.String())) - Expect(depRes.ApiDeploymentsResponse).Should(Equal(details)) + Expect(depRes.ApiConfigurationsResponse).Should(Equal(details)) }) @@ -121,12 +120,15 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) - uri.RawQuery = "type=" + typeFilter + uri.Path = configEndpoint + strconv.Itoa(testCount) + + query := uri.Query() + query.Add("type", typeFilter) + uri.RawQuery = query.Encode() // set test data dep := makeTestDeployment() - dummyDbMan.configurations = make(map[string]*DataDeployment) + dummyDbMan.configurations = make(map[string]*Configuration) dummyDbMan.configurations[typeFilter] = dep detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0]) @@ -137,7 +139,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -146,32 +148,73 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(uri.String())) - Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) }) - It("should get 304 for no change", func() { - + It("should not long poll if using filter", func() { + typeFilter := "ORGANIZATION" // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) + query := uri.Query() + query.Add("type", typeFilter) + query.Add("block", "3") + query.Add(apidConfigIndexPar, dummyDbMan.lsn) + uri.RawQuery = query.Encode() // set test data - setTestDeployments(dummyDbMan, uri.String()) + dep := makeTestDeployment() + + dummyDbMan.configurations = make(map[string]*Configuration) + dummyDbMan.configurations[typeFilter] = dep + detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0]) // http get res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) - etag := res.Header.Get("etag") - Expect(etag).ShouldNot(BeEmpty()) + + // parse response + var depRes ApiConfigurationResponse + body, err := ioutil.ReadAll(res.Body) + Expect(err).Should(Succeed()) + err = json.Unmarshal(body, &depRes) + Expect(err).Should(Succeed()) + + // verify response + Expect(depRes.Kind).Should(Equal(kindCollection)) + Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0] + "?type=" + typeFilter)) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) + + }, 1) + + It("should get 304 for no change", func() { + + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = configEndpoint + strconv.Itoa(testCount) + + // set test data + setTestDeployments(dummyDbMan, uri.String()) + // http get + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + lsn := res.Header.Get(apidConfigIndexHeader) + Expect(lsn).ShouldNot(BeEmpty()) // send second request + query := uri.Query() + query.Add(apidConfigIndexPar, lsn) + uri.RawQuery = query.Encode() + log.Debug(uri.String()) req, err := http.NewRequest("GET", uri.String(), nil) req.Header.Add("Content-Type", "application/json") - req.Header.Add("If-None-Match", etag) // get response res, err = http.DefaultClient.Do(req) @@ -181,42 +224,111 @@ }) // block is not enabled now - XIt("should get empty set after blocking if no deployments", func() { + It("should do long-polling if Gateway_LSN>=APID_LSN, should get 304 for timeout", func() { start := time.Now() // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("block", "1") + query.Add(apidConfigIndexPar, "1.0.0") uri.RawQuery = query.Encode() // http get res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) defer res.Body.Close() - Expect(res.StatusCode).Should(Equal(http.StatusOK)) + Expect(res.StatusCode).Should(Equal(http.StatusNotModified)) //verify blocking time blockingTime := time.Since(start) - log.Warnf("time used: %v", blockingTime.Seconds()) Expect(blockingTime.Seconds() > 0.9).Should(BeTrue()) + }, 2) + + It("should do long-polling if Gateway_LSN>=APID_LSN, should get 200 if not timeout", func() { + + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = configEndpoint + strconv.Itoa(testCount) + query := uri.Query() + query.Add("block", "2") + query.Add(apidConfigIndexPar, "1.0.0") + uri.RawQuery = query.Encode() + // set test data + details := setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0]) + + // notify change + go func() { + time.Sleep(time.Second) + dummyDbMan.lsn = testLSN + testApiMan.notifyNewChange() + }() + + // http get + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + Expect(res.Header.Get(apidConfigIndexHeader)).Should(Equal(testLSN)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) Expect(err).Should(Succeed()) // verify response - Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0])) + Expect(depRes.ApiConfigurationsResponse).Should(Equal(details)) + }, 3) - }, 2) + It("should support long-polling for multiple subscribers", func() { + + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = configEndpoint + strconv.Itoa(testCount) + query := uri.Query() + query.Add("block", "3") + query.Add(apidConfigIndexPar, dummyDbMan.lsn) + uri.RawQuery = query.Encode() + + // set test data + setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0]) + + // http get + count := mathrand.Intn(20) + 5 + finishChan := make(chan int) + for i := 0; i < count; i++ { + go func() { + defer GinkgoRecover() + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + finishChan <- res.StatusCode + }() + } + + // notify change + go func() { + time.Sleep(1500 * time.Millisecond) + dummyDbMan.lsn = testLSN + testApiMan.notifyNewChange() + }() + + for i := 0; i < count; i++ { + Expect(<-finishChan).Should(Equal(http.StatusOK)) + } + + }, 5) It("should get iso8601 time", func() { testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00", "2017-06-22 16:41:02.334"} @@ -225,7 +337,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) for i, t := range testTimes { log.Debug("insert deployment with timestamp: " + t) @@ -233,7 +345,7 @@ dep := makeTestDeployment() dep.Created = t dep.Updated = t - dummyDbMan.readyDeployments = []DataDeployment{*dep} + dummyDbMan.readyDeployments = []Configuration{*dep} detail := makeExpectedDetail(dep, uri.String()) detail.Created = isoTime[i] detail.Updated = isoTime[i] @@ -243,41 +355,17 @@ defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) Expect(err).Should(Succeed()) // verify response - Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) } }) - It("should debounce requests", func(done Done) { - var in = make(chan interface{}) - var out = make(chan []interface{}) - - go testApiMan.debounce(in, out, 3*time.Millisecond) - - go func() { - defer GinkgoRecover() - - received, ok := <-out - Expect(ok).To(BeTrue()) - Expect(len(received)).To(Equal(2)) - - close(in) - received, ok = <-out - Expect(ok).To(BeFalse()) - - close(done) - }() - - in <- "x" - in <- "y" - }) - }) Context("GET /blobs", func() { @@ -315,12 +403,12 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038" + uri.Path = configEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038" //setup test data dummyDbMan.err = nil - dummyDbMan.configurations = make(map[string]*DataDeployment) - expectedConfig := &DataDeployment{ + dummyDbMan.configurations = make(map[string]*Configuration) + expectedConfig := &Configuration{ ID: "3ecd351c-1173-40bf-b830-c194e5ef9038", OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8", EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80", @@ -343,7 +431,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentDetails + var depRes ApiConfigurationDetails body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -381,10 +469,10 @@ if data[1] != nil { dummyDbMan.err = data[1].(error) } - dummyDbMan.configurations = make(map[string]*DataDeployment) - dummyDbMan.configurations[data[0].(string)] = &DataDeployment{} + dummyDbMan.configurations = make(map[string]*Configuration) + dummyDbMan.configurations[data[0].(string)] = &Configuration{} // http get - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string) + uri.Path = configEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string) res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) Expect(res.StatusCode).Should(Equal(expectedCode[i])) @@ -395,12 +483,12 @@ }) -func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiDeploymentDetails { +func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiConfigurationDetails { mathrand.Seed(time.Now().UnixNano()) count := mathrand.Intn(5) + 1 - deployments := make([]DataDeployment, count) - details := make([]ApiDeploymentDetails, count) + deployments := make([]Configuration, count) + details := make([]ApiConfigurationDetails, count) for i := 0; i < count; i++ { dep := makeTestDeployment() @@ -415,13 +503,13 @@ return details } -func makeTestDeployment() *DataDeployment { - dep := &DataDeployment{ +func makeTestDeployment() *Configuration { + dep := &Configuration{ ID: util.GenerateUUID(), OrgID: util.GenerateUUID(), EnvID: util.GenerateUUID(), - BlobID: testBlobId, - BlobResourceID: "", + BlobID: util.GenerateUUID(), //testBlobId, + BlobResourceID: util.GenerateUUID(), //"", Type: "virtual-host", Name: "vh-secure", Revision: "1", @@ -434,8 +522,8 @@ return dep } -func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails { - detail := &ApiDeploymentDetails{ +func makeExpectedDetail(dep *Configuration, self string) *ApiConfigurationDetails { + detail := &ApiConfigurationDetails{ Self: self + "/" + dep.ID, Name: dep.Name, Type: dep.Type, @@ -443,61 +531,10 @@ BeanBlobUrl: getBlobUrl(dep.BlobID), Org: dep.OrgID, Env: dep.EnvID, - ResourceBlobUrl: "", + ResourceBlobUrl: getBlobUrl(dep.BlobResourceID), Path: dep.Path, Created: dep.Created, Updated: dep.Updated, } return detail } - -type dummyDbManager struct { - unreadyBlobIds []string - readyDeployments []DataDeployment - localFSLocation string - fileResponse chan string - version string - configurations map[string]*DataDeployment - err error -} - -func (d *dummyDbManager) setDbVersion(version string) { - d.version = version -} - -func (d *dummyDbManager) initDb() error { - return nil -} - -func (d *dummyDbManager) getUnreadyBlobs() ([]string, error) { - return d.unreadyBlobIds, nil -} - -func (d *dummyDbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) { - if typeFilter == "" { - return d.readyDeployments, nil - } - return []DataDeployment{*(d.configurations[typeFilter])}, nil -} - -func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error { - file, err := os.Open(localFsLocation) - if err != nil { - return err - } - buff := make([]byte, 36) - _, err = file.Read(buff) - if err != nil { - return err - } - d.fileResponse <- string(buff) - return nil -} - -func (d *dummyDbManager) getLocalFSLocation(string) (string, error) { - return d.localFSLocation, nil -} - -func (d *dummyDbManager) getConfigById(id string) (*DataDeployment, error) { - return d.configurations[id], d.err -}
diff --git a/apidGatewayConfDeploy-api.yaml b/apidGatewayConfDeploy-api.yaml index cc1011f..c00d79a 100644 --- a/apidGatewayConfDeploy-api.yaml +++ b/apidGatewayConfDeploy-api.yaml
@@ -48,30 +48,54 @@ in: "query" type: string description: "Long poll block duration in seconds" - - name: "If-None-Match" - in: "header" + - name: "apid-config-index" + in: "query" type: string - description: "ETag value from request in previous request" + description: "x-apid-config-index value from request in previous request" + - name: "type" + in: "query" + type: string + description: "filter configurations by type. When type filter is given, long-polling is not supported" responses: 200: description: Successful response headers: - ETag: + x-apid-config-index: type: "string" description: "client can use this for response caching" schema: $ref: '#/definitions/ConfigurationsResponse' 304: description: Not Modified, No change in response based on If-None-Match header value. Cache representation. - headers: - ETag: - type: "string" - description: "client can use this for response caching" default: description: Error response schema: $ref: '#/definitions/ErrorResponse' - + + /configurations/{configId}: + get: + tags: + - "configurations/{configId}" + description: | + Get a configuration by id + parameters: + - name: configId + in: path + required: true + type: string + description: configId + responses: + 200: + description: Successful response + schema: + $ref: '#/definitions/Configuration' + 304: + description: Not Modified, No change in response based on If-None-Match header value. Cache representation. + default: + description: Error response + schema: + $ref: '#/definitions/ErrorResponse' + /blobs/{blobId}: get: tags: @@ -123,9 +147,9 @@ contents: type: array items: - $ref: '#/definitions/Configurations' + $ref: '#/definitions/Configuration' - Configurations: + Configuration: properties: self: type: string
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go index 0c859f7..a85014a 100644 --- a/apidGatewayConfDeploy_suite_test.go +++ b/apidGatewayConfDeploy_suite_test.go
@@ -50,7 +50,7 @@ config.Set(configApiServerBaseURI, "http://localhost") config.Set(configDebounceDuration, "1ms") config.Set(configDownloadQueueSize, 1) - config.Set(configBundleCleanupDelay, time.Millisecond) + config.Set(configBlobCleanupDelay, time.Millisecond) apid.InitializePlugins("0.0.0") go apid.API().Listen() time.Sleep(1 * time.Second)
diff --git a/bundle.go b/bundle.go index b4729cf..c9d7d80 100644 --- a/bundle.go +++ b/bundle.go
@@ -34,26 +34,23 @@ type bundleManagerInterface interface { initializeBundleDownloading() - queueDownloadRequest(*DataDeployment) - enqueueRequest(*DownloadRequest) - makeDownloadRequest(string) *DownloadRequest - deleteBundlesFromDeployments([]DataDeployment) - deleteBundleById(string) + downloadBlobsWithCallback(blobs []string, callback func()) + deleteBlobs(blobIds []string) Close() } type bundleManager struct { - blobServerUrl string - dbMan dbManagerInterface - apiMan apiManagerInterface - concurrentDownloads int - markDeploymentFailedAfter time.Duration - bundleRetryDelay time.Duration - bundleCleanupDelay time.Duration - downloadQueue chan *DownloadRequest - isClosed *int32 - workers []*BundleDownloader - client *http.Client + blobServerUrl string + dbMan dbManagerInterface + apiMan apiManagerInterface + concurrentDownloads int + markConfigFailedAfter time.Duration + bundleRetryDelay time.Duration + bundleCleanupDelay time.Duration + downloadQueue chan *DownloadRequest + isClosed *int32 + workers []*BundleDownloader + client *http.Client } type blobServerResponse struct { @@ -80,30 +77,22 @@ } } -// download bundle blob and resource blob -// TODO do not download duplicate blobs -func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) { - blobReq := bm.makeDownloadRequest(dep.BlobID) - resourceReq := bm.makeDownloadRequest(dep.BlobResourceID) - - go func() { - bm.enqueueRequest(blobReq) - bm.enqueueRequest(resourceReq) - }() -} - -func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest { - markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) +func (bm *bundleManager) makeDownloadRequest(blobId string, b *BunchDownloadRequest) *DownloadRequest { + if blobId == "" { + return nil + } + markFailedAt := time.Now().Add(bm.markConfigFailedAfter) retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute return &DownloadRequest{ blobServerURL: bm.blobServerUrl, bm: bm, - blobId: id, + blobId: blobId, backoffFunc: createBackoff(retryIn, maxBackOff), markFailedAt: markFailedAt, client: bm.client, + bunchRequest: b, } } @@ -112,14 +101,20 @@ if atomic.LoadInt32(bm.isClosed) == 1 { return } - /* - defer func() { - if r := recover(); r != nil { - log.Warn("trying to enque requests to closed bundleManager") - } - }() - */ - bm.downloadQueue <- r + if r != nil { + bm.downloadQueue <- r + } +} + +func (bm *bundleManager) downloadBlobsWithCallback(blobs []string, callback func()) { + + c := &BunchDownloadRequest{ + bm: bm, + blobs: blobs, + attemptCounter: new(int32), + callback: callback, + } + c.download() } func (bm *bundleManager) Close() { @@ -127,32 +122,53 @@ close(bm.downloadQueue) } -func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) { - for _, dep := range deletedDeployments { - go bm.deleteBundleById(dep.BlobID) - go bm.deleteBundleById(dep.BlobResourceID) +func (bm *bundleManager) deleteBlobs(blobs []string) { + for _, id := range blobs { + go bm.deleteBlobById(id) } - - /* - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - go func() { - // give clients a minute to avoid conflicts - time.Sleep(bm.bundleCleanupDelay) - for _, dep := range deletedDeployments { - bundleFile := getBlobFilePath(dep.BlobID) - log.Debugf("removing old bundle: %v", bundleFile) - // TODO Remove from the Database table apid_blob_available - safeDelete(bundleFile) - } - }() - */ } // TODO add delete support -func (bm *bundleManager) deleteBundleById(blobId string) { +func (bm *bundleManager) deleteBlobById(blobId string) { } +type BunchDownloadRequest struct { + bm *bundleManager + blobs []string + attemptCounter *int32 + callback func() +} + +func (b *BunchDownloadRequest) download() { + //remove empty Ids + var ids []string + for _, id := range b.blobs { + if id != "" { + ids = append(ids, id) + } + } + b.blobs = ids + log.Debugf("Attempt to download blobs, len: %v", len(b.blobs)) + + if len(b.blobs) == 0 && b.callback != nil { + go b.callback() + return + } + + *b.attemptCounter = int32(len(b.blobs)) + for _, id := range b.blobs { + req := b.bm.makeDownloadRequest(id, b) + go b.bm.enqueueRequest(req) + } +} + +func (b *BunchDownloadRequest) downloadAttempted() { + if atomic.AddInt32(b.attemptCounter, -1) == 0 && b.callback != nil { + go b.callback() + } +} + type DownloadRequest struct { bm *bundleManager blobId string @@ -160,12 +176,15 @@ markFailedAt time.Time blobServerURL string client *http.Client + bunchRequest *BunchDownloadRequest + attempted bool } -func (r *DownloadRequest) downloadBundle() error { +func (r *DownloadRequest) downloadBlob() error { log.Debugf("starting bundle download attempt for blobId=%s", r.blobId) - + var err error + defer r.markAttempted(&err) if r.checkTimeout() { return &timeoutError{ markFailedAt: r.markFailedAt, @@ -188,8 +207,6 @@ return err } - log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile) - err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile) if err != nil { log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId) @@ -199,10 +216,7 @@ return err } - log.Debugf("bundle downloaded: blobId=%s filename=%s", r.blobId, downloadedFile) - - // TODO send changed deployments to subscribers (API call with "block") - //r.bm.apiMan.addChangedDeployment(dep.ID) + log.Debugf("blod downloaded and inserted: blobId=%s filename=%s", r.blobId, downloadedFile) return nil } @@ -218,6 +232,19 @@ return false } +func (r *DownloadRequest) markAttempted(errp *error) { + if !r.attempted { + r.attempted = true + err := *errp + if r.bunchRequest != nil { + r.bunchRequest.downloadAttempted() + } + if err != nil { + //TODO: insert to DB as "attempted but unsuccessful" + } + } +} + func getBlobFilePath(blobId string) string { return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId))) } @@ -263,7 +290,6 @@ func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) { var tempFile *os.File - log.Debugf("Downloading bundle: %s", blobId) uri, err := getSignedURL(client, blobServerURL, blobId) if err != nil { @@ -282,18 +308,18 @@ var confReader io.ReadCloser confReader, err = getUriReaderWithAuth(client, uri) if err != nil { - log.Errorf("Unable to retrieve bundle %s: %v", uri, err) + log.Errorf("Unable to retrieve Blob %s: %v", uri, err) return } defer confReader.Close() _, err = io.Copy(tempFile, confReader) if err != nil { - log.Errorf("Unable to write bundle %s: %v", tempFileName, err) + log.Errorf("Unable to write Blob %s: %v", tempFileName, err) return } - log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName) + log.Debugf("Blob %s downloaded to: %s", uri, tempFileName) return } @@ -328,7 +354,7 @@ for req := range w.bm.downloadQueue { log.Debugf("starting download blobId=%s", req.blobId) - err := req.downloadBundle() + err := req.downloadBlob() if err != nil { // timeout if _, ok := err.(*timeoutError); ok {
diff --git a/bundle_test.go b/bundle_test.go index 30760cc..c77114f 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -17,14 +17,11 @@ import ( "net/http" - "bytes" - "encoding/json" "github.com/apid/apid-core/util" - "github.com/gorilla/mux" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "io" - "strings" + mathrand "math/rand" "sync/atomic" "time" ) @@ -64,19 +61,22 @@ } // init dummy api manager - dummyApiMan = &dummyApiManager{} + dummyApiMan = &dummyApiManager{ + notifyChan: make(chan bool, 1), + initCalled: make(chan bool), + } // init bundle manager testBundleMan = &bundleManager{ - blobServerUrl: bundleTestUrl, - dbMan: dummyDbMan, - apiMan: dummyApiMan, - concurrentDownloads: concurrentDownloads, - markDeploymentFailedAfter: 5 * time.Second, - bundleRetryDelay: time.Second, - bundleCleanupDelay: 5 * time.Second, - downloadQueue: make(chan *DownloadRequest, downloadQueueSize), - isClosed: new(int32), + blobServerUrl: bundleTestUrl, + dbMan: dummyDbMan, + apiMan: dummyApiMan, + concurrentDownloads: concurrentDownloads, + markConfigFailedAfter: 5 * time.Second, + bundleRetryDelay: time.Second, + bundleCleanupDelay: 5 * time.Second, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), client: &http.Client{ Timeout: time.Second, Transport: &http.Transport{ @@ -95,113 +95,124 @@ dummyApiMan = nil }) - It("should download blob according to id", func() { - // download blob - id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) - received := <-dummyDbMan.fileResponse - Expect(received).Should(Equal(id)) + Context("download blobs", func() { + + It("should download blob according to id", func() { + // download blob + id := util.GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + }) + + It("should timeout connection and retry", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 500 * time.Millisecond + testBundleMan.bundleRetryDelay = 50 * time.Millisecond + + // download blobs + id := util.GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + + }, 4) + + It("should mark as failure according to markConfigFailedAfter", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 100 * time.Millisecond + testBundleMan.bundleRetryDelay = 100 * time.Millisecond + testBundleMan.markConfigFailedAfter = 200 * time.Millisecond + + // download blobs + id := util.GenerateUUID() + req := testBundleMan.makeDownloadRequest(id, nil) + Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) + testBundleMan.enqueueRequest(req) + + // should fail + time.Sleep(time.Second) + Expect(req.markFailedAt.IsZero()).Should(BeTrue()) + }, 4) + + It("should call callback func after a round of download attempts", func() { + // download blobs + var ids []string + num := 1 + mathrand.Intn(5) + for i := 0; i < num; i++ { + ids = append(ids, util.GenerateUUID()) + } + finishChan := make(chan int) + testBundleMan.downloadBlobsWithCallback(ids, func() { + finishChan <- 1 + }) + for i := 0; i < num; i++ { + <-dummyDbMan.fileResponse + } + <-finishChan + // if there's no blob + testBundleMan.downloadBlobsWithCallback(nil, func() { + finishChan <- 1 + }) + <-finishChan + }, 1) }) - It("should timeout connection and retry", func() { - // setup timeout - atomic.StoreInt32(blobServer.signedTimeout, 1) - atomic.StoreInt32(blobServer.blobTimeout, 1) - testBundleMan.client.Timeout = 500 * time.Millisecond - testBundleMan.bundleRetryDelay = 50 * time.Millisecond + Context("download blobs for changelist", func() { + It("should download blobs for changelist", func() { + //setup test data + count := mathrand.Intn(10) + 1 + configs := make([]*Configuration, count) + for i := 0; i < count; i++ { + conf := makeTestDeployment() + conf.BlobID = util.GenerateUUID() + conf.BlobResourceID = util.GenerateUUID() + configs[i] = conf + } - // download blobs - id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id)) - received := <-dummyDbMan.fileResponse - Expect(received).Should(Equal(id)) + // should download blobs for changelist + testBundleMan.downloadBlobsWithCallback(extractBlobsToDownload(configs), dummyApiMan.notifyNewChange) + for i := 0; i < 2*count; i++ { + <-dummyDbMan.fileResponse + } - }, 4) + // should notify after 1st download attempt + <-dummyApiMan.notifyChan + }) - It("should mark as failure according to markDeploymentFailedAfter", func() { - // setup timeout - atomic.StoreInt32(blobServer.signedTimeout, 1) - atomic.StoreInt32(blobServer.blobTimeout, 1) - testBundleMan.client.Timeout = 100 * time.Millisecond - testBundleMan.bundleRetryDelay = 100 * time.Millisecond - testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond + It("should notify after 1st download attempt unless failure", func() { + //setup test data + count := mathrand.Intn(10) + 1 + configs := make([]*Configuration, count) + for i := 0; i < count; i++ { + conf := makeTestDeployment() + conf.BlobID = util.GenerateUUID() + conf.BlobResourceID = util.GenerateUUID() + configs[i] = conf + } - // download blobs - id := util.GenerateUUID() - req := testBundleMan.makeDownloadRequest(id) - Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) - testBundleMan.enqueueRequest(req) + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 500 * time.Millisecond + testBundleMan.bundleRetryDelay = 50 * time.Millisecond - // should fail - time.Sleep(time.Second) - Expect(req.markFailedAt.IsZero()).Should(BeTrue()) - }, 4) + // should download blobs for changelist + testBundleMan.downloadBlobsWithCallback(extractBlobsToDownload(configs), dummyApiMan.notifyNewChange) + + // should notify after 1st download attempt + <-dummyApiMan.notifyChan + + //should retry download + for i := 0; i < 2*count; i++ { + <-dummyDbMan.fileResponse + } + }) + + }) + }) - -type dummyApiManager struct { - initCalled bool -} - -func (a *dummyApiManager) InitAPI() { - a.initCalled = true -} - -type dummyBlobServer struct { - serverEndpoint string - signedEndpoint string - signedTimeout *int32 - blobTimeout *int32 - resetTimeout bool -} - -func (b *dummyBlobServer) start() { - services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET") - services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET") -} - -// send a dummy uri as response -func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) { - defer GinkgoRecover() - if atomic.LoadInt32(b.signedTimeout) == int32(1) { - if b.resetTimeout { - atomic.StoreInt32(b.signedTimeout, 0) - } - time.Sleep(time.Second) - } - vars := mux.Vars(r) - blobId := vars["blobId"] - - uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1) - log.Debug("dummyBlobServer returnSigned: " + uriString) - - res := blobServerResponse{ - Id: blobId, - Kind: "Blob", - Self: r.RequestURI, - SignedUrl: uriString, - SignedUrlExpiryTimestamp: time.Now().Add(3 * time.Hour).Format(time.RFC3339), - } - - resBytes, err := json.Marshal(res) - Expect(err).Should(Succeed()) - _, err = io.Copy(w, bytes.NewReader(resBytes)) - Expect(err).Should(Succeed()) - w.Header().Set("Content-Type", headerSteam) -} - -// send blobId back as response -func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) { - defer GinkgoRecover() - if atomic.LoadInt32(b.blobTimeout) == int32(1) { - if b.resetTimeout { - atomic.StoreInt32(b.blobTimeout, 0) - } - time.Sleep(time.Second) - } - vars := mux.Vars(r) - blobId := vars["blobId"] - log.Debug("dummyBlobServer returnBlob id=" + blobId) - _, err := io.Copy(w, bytes.NewReader([]byte(blobId))) - Expect(err).Should(Succeed()) - w.Header().Set("Content-Type", headerSteam) -}
diff --git a/cover.sh b/cover.sh new file mode 100755 index 0000000..85bbc35 --- /dev/null +++ b/cover.sh
@@ -0,0 +1,27 @@ +#!/bin/bash -eu +# +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#!/usr/bin/env bash + +set -e +echo "mode: atomic" > coverage.txt + +go test -coverprofile=profile.out -covermode=atomic github.com/apid/apidGatewayConfDeploy +if [ -f profile.out ]; then + tail -n +2 profile.out >> coverage.txt + rm profile.out +fi +go tool cover -html=coverage.txt -o cover.html
diff --git a/data.go b/data.go index d424e2b..e99cba7 100644 --- a/data.go +++ b/data.go
@@ -21,11 +21,15 @@ "reflect" ) +const ( + InitLSN = "0.0.0" +) + var ( gwBlobId int64 ) -type DataDeployment struct { +type Configuration struct { ID string OrgID string EnvID string @@ -49,16 +53,21 @@ setDbVersion(string) initDb() error getUnreadyBlobs() ([]string, error) - getReadyDeployments(typeFilter string) ([]DataDeployment, error) + getAllConfigurations(typeFilter string) ([]Configuration, error) updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) - getConfigById(string) (*DataDeployment, error) + getConfigById(string) (*Configuration, error) + loadLsnFromDb() error + updateLSN(LSN string) error + getLSN() string } type dbManager struct { - data apid.DataService - db apid.DB - dbMux sync.RWMutex + data apid.DataService + db apid.DB + dbMux sync.RWMutex + apidLSN string + lsnMutex sync.RWMutex } func (dbc *dbManager) setDbVersion(version string) { @@ -84,7 +93,7 @@ } defer tx.Rollback() _, err = tx.Exec(` - CREATE TABLE IF NOT EXISTS apid_blob_available ( + CREATE TABLE IF NOT EXISTS APID_BLOB_AVAILABLE ( id text primary key, local_fs_location text NOT NULL ); @@ -92,15 +101,33 @@ if err != nil { return err } - err = tx.Commit() + _, err = tx.Exec(` + CREATE TABLE IF NOT EXISTS APID_CONFIGURATION_LSN ( + lsn text primary key + ); + `) if err != nil { return err } - log.Debug("Database table apid_blob_available created.") + + // insert a row if APID_CONFIGURATION_LSN is empty + _, err = tx.Exec(` + INSERT INTO APID_CONFIGURATION_LSN (lsn) + SELECT '0.0.0' + WHERE NOT EXISTS (SELECT * FROM APID_CONFIGURATION_LSN) + `) + if err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + log.Debug("Database table APID_BLOB_AVAILABLE, APID_CONFIGURATION_LSN created.") return nil } -func (dbc *dbManager) getConfigById(id string) (config *DataDeployment, err error) { +func (dbc *dbManager) getConfigById(id string) (config *Configuration, err error) { row := dbc.getDb().QueryRow(` SELECT a.id, a.organization_id, @@ -115,30 +142,29 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.id = ?; `, id) - config, err = dataDeploymentsFromRow(row) + config, err = configurationFromDbRow(row) if err != nil { return nil, err } return config, nil } -// getUnreadyDeployments() returns array of resources that are not yet to be processed func (dbc *dbManager) getUnreadyBlobs() (ids []string, err error) { rows, err := dbc.getDb().Query(` SELECT id FROM ( SELECT a.bean_blob_id as id - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.bean_blob_id NOT IN - (SELECT b.id FROM apid_blob_available as b) + (SELECT b.id FROM APID_BLOB_AVAILABLE as b) UNION SELECT a.resource_blob_id as id - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.resource_blob_id NOT IN - (SELECT b.id FROM apid_blob_available as b) + (SELECT b.id FROM APID_BLOB_AVAILABLE as b) ) WHERE id IS NOT NULL AND id != '' ; @@ -161,7 +187,8 @@ return } -func (dbc *dbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) { +/* +func (dbc *dbManager) getReadyConfigurations(typeFilter string) ([]Configuration, error) { // An alternative statement is in get_ready_deployments.sql // Need testing with large data volume to determine which is better @@ -183,27 +210,27 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.id IN ( SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.resource_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" INTERSECT SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" UNION SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = "" ) @@ -224,28 +251,28 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.type = ? AND a.id IN ( SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.resource_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" INTERSECT SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" UNION SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = "" ) @@ -259,14 +286,72 @@ } defer rows.Close() - deployments, err := dataDeploymentsFromRows(rows) + confs, err := configurationsFromDbRows(rows) if err != nil { return nil, err } - log.Debugf("Configurations ready: %v", deployments) + //log.Debugf("Configurations ready: %v", confs) - return deployments, nil + return confs, nil + +} +*/ +func (dbc *dbManager) getAllConfigurations(typeFilter string) ([]Configuration, error) { + + // An alternative statement is in get_ready_deployments.sql + // Need testing with large data volume to determine which is better + + var rows *sql.Rows + var err error + if typeFilter == "" { + rows, err = dbc.getDb().Query(` + SELECT a.id, + a.organization_id, + a.environment_id, + a.bean_blob_id, + a.resource_blob_id, + a.type, + a.name, + a.revision, + a.path, + a.created_at, + a.created_by, + a.updated_at, + a.updated_by + FROM METADATA_RUNTIME_ENTITY_METADATA as a + ;`) + } else { + rows, err = dbc.getDb().Query(` + SELECT a.id, + a.organization_id, + a.environment_id, + a.bean_blob_id, + a.resource_blob_id, + a.type, + a.name, + a.revision, + a.path, + a.created_at, + a.created_by, + a.updated_at, + a.updated_by + FROM METADATA_RUNTIME_ENTITY_METADATA as a + WHERE a.type = ? + ;`, typeFilter) + } + + if err != nil { + log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err) + return nil, err + } + defer rows.Close() + + confs, err := configurationsFromDbRows(rows) + if err != nil { + return nil, err + } + return confs, nil } @@ -277,21 +362,21 @@ } defer txn.Rollback() _, err = txn.Exec(` - INSERT OR IGNORE INTO apid_blob_available ( + INSERT OR IGNORE INTO APID_BLOB_AVAILABLE ( id, local_fs_location ) VALUES (?, ?);`, blobId, localFsLocation) if err != nil { - log.Errorf("INSERT apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err) + log.Errorf("INSERT APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err) return err } err = txn.Commit() if err != nil { - log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err) + log.Errorf("UPDATE APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err) return err } - log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation) + log.Debugf("INSERT APID_BLOB_AVAILABLE {%s} local_fs_location {%s} succeeded", blobId, localFsLocation) return nil } @@ -299,7 +384,7 @@ func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) { log.Debugf("Getting the blob file for blobId {%s}", blobId) - rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'") + rows, err := dbc.getDb().Query("SELECT local_fs_location FROM APID_BLOB_AVAILABLE WHERE id = '" + blobId + "'") if err != nil { log.Errorf("SELECT local_fs_location failed %v", err) return "", err @@ -317,12 +402,62 @@ return } -func dataDeploymentsFromRows(rows *sql.Rows) ([]DataDeployment, error) { - tmp, err := structFromRows(reflect.TypeOf((*DataDeployment)(nil)).Elem(), rows) +func (dbc *dbManager) loadLsnFromDb() error { + var LSN sql.NullString + ret := InitLSN + + // If there's LSN for configuration + err := dbc.getDb().QueryRow("select lsn from APID_CONFIGURATION_LSN LIMIT 1").Scan(&LSN) + if err != nil && err != sql.ErrNoRows { + log.Errorf("Failed to select lsn from APID_CONFIGURATION_LSN: %v", err) + return err + } + if LSN.Valid { + ret = LSN.String + log.Debugf("LSN from APID_CONFIGURATION_LSN: %s", LSN.String) + } + dbc.lsnMutex.Lock() + defer dbc.lsnMutex.Unlock() + dbc.apidLSN = ret + return nil +} + +func (dbc *dbManager) getLSN() string { + dbc.lsnMutex.RLock() + defer dbc.lsnMutex.RUnlock() + return dbc.apidLSN +} + +func (dbc *dbManager) updateLSN(LSN string) (err error) { + + tx, err := dbc.getDb().Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return + } + defer tx.Rollback() + _, err = tx.Exec("UPDATE APID_CONFIGURATION_LSN SET lsn=?;", LSN) + if err != nil { + log.Errorf("UPDATE APID_CONFIGURATION_LSN Failed: %v", err) + return + } + log.Debugf("UPDATE APID_CONFIGURATION_LSN Success: %s", LSN) + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in updateLSN: %v", err) + return + } + dbc.lsnMutex.Lock() + defer dbc.lsnMutex.Unlock() + dbc.apidLSN = LSN + return +} + +func configurationsFromDbRows(rows *sql.Rows) ([]Configuration, error) { + tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows) if err != nil { return nil, err } - return tmp.([]DataDeployment), nil + return tmp.([]Configuration), nil } func structFromRows(t reflect.Type, rows *sql.Rows) (interface{}, error) { @@ -349,15 +484,15 @@ return slice.Interface(), nil } -func dataDeploymentsFromRow(row *sql.Row) (*DataDeployment, error) { - tmp, err := structFromRow(reflect.TypeOf((*DataDeployment)(nil)).Elem(), row) +func configurationFromDbRow(row *sql.Row) (*Configuration, error) { + tmp, err := structFromRow(reflect.TypeOf((*Configuration)(nil)).Elem(), row) if err != nil { if err != sql.ErrNoRows { - log.Errorf("Error in dataDeploymentsFromRow: %v", err) + log.Errorf("Error in configurationFromDbRow: %v", err) } return nil, err } - config := tmp.(DataDeployment) + config := tmp.(Configuration) return &config, nil }
diff --git a/data_test.go b/data_test.go index fbbb0a7..65aace8 100644 --- a/data_test.go +++ b/data_test.go
@@ -15,6 +15,8 @@ package apiGatewayConfDeploy import ( + "database/sql" + "fmt" "github.com/apid/apid-core" "github.com/apid/apid-core/data" . "github.com/onsi/ginkgo" @@ -36,6 +38,8 @@ "gcs:SHA-512:8fcc902465ccb32ceff25fa9f6fb28e3b314dbc2874c0f8add02f4e29c9e2798d344c51807aa1af56035cf09d39c800cf605d627ba65723f26d8b9c83c82d2f2": true, "gcs:SHA-512:0c648779da035bfe0ac21f6268049aa0ae74d9d6411dadefaec33991e55c2d66c807e06f7ef84e0947f7c7d63b8c9e97cf0684cbef9e0a86b947d73c74ae7455": true, } + + allConfigs map[string]bool ) var _ = Describe("data", func() { @@ -44,13 +48,22 @@ var _ = BeforeEach(func() { testCount += 1 testDbMan = &dbManager{ - data: services.Data(), - dbMux: sync.RWMutex{}, + data: services.Data(), + dbMux: sync.RWMutex{}, + lsnMutex: sync.RWMutex{}, } testDbMan.setDbVersion("test" + strconv.Itoa(testCount)) initTestDb(testDbMan.getDb()) err := testDbMan.initDb() Expect(err).Should(Succeed()) + allConfigs = map[string]bool{ + "1dc4895e-6494-4b59-979f-5f4c89c073b4": true, + "319963ff-217e-4ecc-8d6e-c3665e962d1e": true, + "3af44bb7-0a74-4283-860c-3561e6c19132": true, + "d5ffd9db-4795-43eb-b645-d2a0b6c8ac6a": true, + "84ac8d68-b3d1-4bcc-ad0d-c6a0ed67e16c": true, + "3ecd351c-1173-40bf-b830-c194e5ef9038": true, + } time.Sleep(100 * time.Millisecond) }) @@ -59,7 +72,7 @@ data.Delete(data.VersionedDBID("common", "test"+strconv.Itoa(testCount))) }) - Context("db tests", func() { + Context("basic db tests", func() { It("initDb() should be idempotent", func() { err := testDbMan.initDb() Expect(err).Should(Succeed()) @@ -90,12 +103,79 @@ Expect(count).Should(Equal(6)) }) - It("should get empty slice if no deployments are ready", func() { - deps, err := testDbMan.getReadyDeployments("") + It("should initialize support for long-polling", func() { + // APID_CONFIGURATION_LSN + rows, err := testDbMan.getDb().Query(` + SELECT lsn from APID_CONFIGURATION_LSN; + `) Expect(err).Should(Succeed()) - Expect(len(deps)).Should(BeZero()) + defer rows.Close() + count := 0 + var lsn sql.NullString + for rows.Next() { + count++ + rows.Scan(&lsn) + } + Expect(count).Should(Equal(1)) + Expect(lsn.Valid).Should(BeTrue()) + Expect(lsn.String).Should(Equal(InitLSN)) }) + It("should maintain LSN", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // write + err := testDbMan.updateLSN(testLSN) + Expect(err).Should(Succeed()) + rows, err := testDbMan.getDb().Query(` + SELECT lsn from APID_CONFIGURATION_LSN; + `) + defer rows.Close() + count := 0 + var lsn sql.NullString + for rows.Next() { + count++ + rows.Scan(&lsn) + } + Expect(count).Should(Equal(1)) + Expect(lsn.Valid).Should(BeTrue()) + Expect(lsn.String).Should(Equal(testLSN)) + + // read + Expect(testDbMan.getLSN()).Should(Equal(testLSN)) + + //load + Expect(testDbMan.loadLsnFromDb()).Should(Succeed()) + Expect(testDbMan.apidLSN).Should(Equal(testLSN)) + }) + }) + + Context("configuration tests", func() { + + It("should get all configs", func() { + confs, err := testDbMan.getAllConfigurations("") + Expect(err).Should(Succeed()) + Expect(len(confs)).Should(Equal(6)) + for _, conf := range confs { + Expect(allConfigs[conf.ID]).Should(BeTrue()) + allConfigs[conf.ID] = false + } + }) + + It("should get empty slice if no configurations", func() { + trancateTestMetadataTable(testDbMan.getDb()) + confs, err := testDbMan.getAllConfigurations("") + Expect(err).Should(Succeed()) + Expect(len(confs)).Should(BeZero()) + }) + + /* + XIt("should get empty slice if no configurations are ready", func() { + confs, err := testDbMan.getReadyConfigurations("") + Expect(err).Should(Succeed()) + Expect(len(confs)).Should(BeZero()) + }) + */ + It("should succefully update local FS location", func() { err := testDbMan.updateLocalFsLocation(testBlobId, testBlobLocalFsPrefix+testBlobId) @@ -127,7 +207,7 @@ It("should get configuration by Id", func() { config, err := testDbMan.getConfigById("3ecd351c-1173-40bf-b830-c194e5ef9038") Expect(err).Should(Succeed()) - expectedResponse := &DataDeployment{ + expectedResponse := &Configuration{ ID: "3ecd351c-1173-40bf-b830-c194e5ef9038", OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8", EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80", @@ -151,44 +231,43 @@ Expect(err).ShouldNot(Succeed()) }) - It("should successfully get all ready configurations", func() { + /* + XIt("should successfully get all ready configurations", func() { - err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId) - Expect(err).Should(Succeed()) - err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId) - Expect(err).Should(Succeed()) + err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId) + Expect(err).Should(Succeed()) + err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId) + Expect(err).Should(Succeed()) - deps, err := testDbMan.getReadyDeployments("") - Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(2)) - for _, dep := range deps { - Expect(dep.BlobID).Should(Equal(readyBlobId)) - if dep.BlobResourceID != "" { - Expect(dep.BlobResourceID).Should(Equal(readyResourceId)) + confs, err := testDbMan.getReadyConfigurations("") + Expect(err).Should(Succeed()) + Expect(len(confs)).Should(Equal(2)) + for _, conf := range confs { + Expect(conf.BlobID).Should(Equal(readyBlobId)) + if conf.BlobResourceID != "" { + Expect(conf.BlobResourceID).Should(Equal(readyResourceId)) + } } - } - }) - - It("should get ready configurations by type filter", func() { + }) + */ + It("should get all configurations by type filter", func() { err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId) Expect(err).Should(Succeed()) err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId) Expect(err).Should(Succeed()) - deps, err := testDbMan.getReadyDeployments("ORGANIZATION") + confs, err := testDbMan.getAllConfigurations("ORGANIZATION") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(1)) - Expect(deps[0].ID).Should(Equal("319963ff-217e-4ecc-8d6e-c3665e962d1e")) + Expect(len(confs)).Should(Equal(2)) - deps, err = testDbMan.getReadyDeployments("ENVIRONMENT") + confs, err = testDbMan.getAllConfigurations("ENVIRONMENT") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(1)) - Expect(deps[0].ID).Should(Equal("1dc4895e-6494-4b59-979f-5f4c89c073b4")) + Expect(len(confs)).Should(Equal(4)) - deps, err = testDbMan.getReadyDeployments("INVALID-TYPE") + confs, err = testDbMan.getAllConfigurations("INVALID-TYPE") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(0)) + Expect(len(confs)).Should(Equal(0)) }) It("should succefully get all unready blob ids", func() { @@ -362,3 +441,14 @@ Expect(err).Should(Succeed()) Expect(tx.Commit()).Should(Succeed()) } + +func trancateTestMetadataTable(db apid.DB) { + tx, err := db.Begin() + Expect(err).Should(Succeed()) + defer tx.Rollback() + _, err = tx.Exec(` + DELETE FROM metadata_runtime_entity_metadata; + `) + Expect(err).Should(Succeed()) + Expect(tx.Commit()).Should(Succeed()) +}
diff --git a/init.go b/init.go index 93e67ce..41ee499 100644 --- a/init.go +++ b/init.go
@@ -26,24 +26,24 @@ ) const ( - configProtocol = "protocol_type" - configAPIListen = "api_listen" - configBundleBlobDownloadEndpoint = "gatewaydeploy_bundle_download_endpoint" - configBundleDirKey = "gatewaydeploy_bundle_dir" - configDebounceDuration = "gatewaydeploy_debounce_duration" - configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" - configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout" - configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout" - configApiServerBaseURI = "apigeesync_proxy_server_base" - configApidInstanceID = "apigeesync_apid_instance_id" - configApidClusterID = "apigeesync_cluster_id" - configConcurrentDownloads = "apigeesync_concurrent_downloads" - configDownloadQueueSize = "apigeesync_download_queue_size" - configBlobServerBaseURI = "apigeesync_blob_server_base" - configStoragePath = "local_storage_path" - maxIdleConnsPerHost = 50 - httpTimeout = time.Minute - configBearerToken = "apigeesync_bearer_token" + configProtocol = "protocol_type" + configAPIListen = "api_listen" + configBlobDownloadEndpoint = "gatewaydeploy_bundle_download_endpoint" + configBlobDirKey = "gatewaydeploy_bundle_dir" + configDebounceDuration = "gatewaydeploy_debounce_duration" + configBlobCleanupDelay = "gatewaydeploy_bundle_cleanup_delay" + configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout" + configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout" + configApiServerBaseURI = "apigeesync_proxy_server_base" + configApidInstanceID = "apigeesync_apid_instance_id" + configApidClusterID = "apigeesync_cluster_id" + configConcurrentDownloads = "apigeesync_concurrent_downloads" + configDownloadQueueSize = "apigeesync_download_queue_size" + configBlobServerBaseURI = "apigeesync_blob_server_base" + configStoragePath = "local_storage_path" + maxIdleConnsPerHost = 50 + httpTimeout = time.Minute + configBearerToken = "apigeesync_bearer_token" ) var ( @@ -81,9 +81,9 @@ return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err) } - config.SetDefault(configBundleDirKey, "bundles") + config.SetDefault(configBlobDirKey, "bundles") config.SetDefault(configDebounceDuration, time.Second) - config.SetDefault(configBundleCleanupDelay, time.Minute) + config.SetDefault(configBlobCleanupDelay, time.Minute) config.SetDefault(configMarkDeployFailedAfter, 5*time.Minute) config.SetDefault(configDownloadConnTimeout, 5*time.Minute) config.SetDefault(configConcurrentDownloads, 15) @@ -94,9 +94,9 @@ return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration) } - bundleCleanupDelay := config.GetDuration(configBundleCleanupDelay) + bundleCleanupDelay := config.GetDuration(configBlobCleanupDelay) if bundleCleanupDelay < time.Millisecond { - return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay) + return pluginData, fmt.Errorf("%s must be a positive duration", configBlobCleanupDelay) } markDeploymentFailedAfter := config.GetDuration(configMarkDeployFailedAfter) @@ -133,21 +133,19 @@ // initialize api manager apiMan := &apiManager{ - dbMan: dbMan, - deploymentsEndpoint: deploymentsEndpoint, - blobEndpoint: blobEndpoint, - deploymentIdEndpoint: deploymentIdEndpoint, - eTag: 0, - deploymentsChanged: make(chan interface{}, 5), - addSubscriber: make(chan chan deploymentsResult), - removeSubscriber: make(chan chan deploymentsResult), - apiInitialized: false, + dbMan: dbMan, + configurationEndpoint: configEndpoint, + blobEndpoint: blobEndpoint, + configurationIdEndpoint: configIdEndpoint, + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}, 100), + apiInitialized: false, } // initialize bundle manager blobServerURL := config.GetString(configBlobServerBaseURI) - relativeBundlePath := config.GetString(configBundleDirKey) + relativeBundlePath := config.GetString(configBlobDirKey) storagePath := config.GetString(configStoragePath) bundlePath = path.Join(storagePath, relativeBundlePath) if err := os.MkdirAll(bundlePath, 0700); err != nil { @@ -157,23 +155,20 @@ concurrentDownloads := config.GetInt(configConcurrentDownloads) downloadQueueSize := config.GetInt(configDownloadQueueSize) bundleMan := &bundleManager{ - blobServerUrl: blobServerURL, - dbMan: dbMan, - apiMan: apiMan, - concurrentDownloads: concurrentDownloads, - markDeploymentFailedAfter: markDeploymentFailedAfter, - bundleRetryDelay: time.Second, - bundleCleanupDelay: bundleCleanupDelay, - downloadQueue: make(chan *DownloadRequest, downloadQueueSize), - isClosed: new(int32), - client: httpClient, + blobServerUrl: blobServerURL, + dbMan: dbMan, + apiMan: apiMan, + concurrentDownloads: concurrentDownloads, + markConfigFailedAfter: markDeploymentFailedAfter, + bundleRetryDelay: time.Second, + bundleCleanupDelay: bundleCleanupDelay, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), + client: httpClient, } bundleMan.initializeBundleDownloading() - //TODO initialize apiMan.distributeEvents() for api call with "block" - //go apiMan.distributeEvents() - // initialize event handler eventHandler = &apigeeSyncHandler{ dbMan: dbMan,
diff --git a/listener.go b/listener.go index 2769568..b203187 100644 --- a/listener.go +++ b/listener.go
@@ -36,13 +36,6 @@ } } -type bundleConfigJson struct { - Name string `json:"name"` - URI string `json:"uri"` - ChecksumType string `json:"checksumType"` - Checksum string `json:"checksum"` -} - type apigeeSyncHandler struct { dbMan dbManagerInterface apiMan apiManagerInterface @@ -70,109 +63,136 @@ log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) h.dbMan.setDbVersion(snapshot.SnapshotInfo) + err := h.dbMan.initDb() + if err != nil { + log.Panicf("unable to init DB: %v", err) + } + if lsn := h.dbMan.getLSN(); lsn != "" { + // receive a new snapshot at runtime + if err = h.dbMan.updateLSN(lsn); err != nil { + log.Errorf("Unable to update LSN: %v", err) + } + } else { //apid just started + if err = h.dbMan.loadLsnFromDb(); err != nil { + log.Errorf("Unable to load LSN From Db: %v", err) + } + } h.startupOnExistingDatabase() - h.apiMan.InitAPI() + //h.apiMan.InitAPI() log.Debug("Snapshot processed") } -// TODO make it work with new schema func (h *apigeeSyncHandler) startupOnExistingDatabase() { // start bundle downloads that didn't finish + go func() { // create apid_blob_available table - h.dbMan.initDb() + blobIds, err := h.dbMan.getUnreadyBlobs() if err != nil { - log.Panicf("unable to query database for unready deployments: %v", err) + log.Panicf("unable to query database for unready configurations: %v", err) } log.Debugf("Queuing %d blob downloads", len(blobIds)) - for _, id := range blobIds { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id)) - } + + // initialize API endpoints only after 1 round of download attempts is made + h.bundleMan.downloadBlobsWithCallback(blobIds, func() { + h.apiMan.InitAPI() + h.apiMan.notifyNewChange() + }) + }() } func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) { log.Debugf("Processing changes") - // changes have been applied to DB - var insertedDeployments, deletedDeployments []DataDeployment - var updatedNewBlobs, updatedOldBlobs []string + // changes have been applied to DB by apidApigeeSync + var insertedConfigs, updatedNewConfigs, updatedOldConfigs, deletedConfigs []*Configuration + isConfigChanged := false for _, change := range changes.Changes { switch change.Table { case CONFIG_METADATA_TABLE: + isConfigChanged = true switch change.Operation { case common.Insert: - dep := dataDeploymentFromRow(change.NewRow) - insertedDeployments = append(insertedDeployments, dep) + conf := configurationFromRow(change.NewRow) + insertedConfigs = append(insertedConfigs, &conf) case common.Delete: - dep := dataDeploymentFromRow(change.OldRow) - deletedDeployments = append(deletedDeployments, dep) + conf := configurationFromRow(change.OldRow) + deletedConfigs = append(deletedConfigs, &conf) case common.Update: - depNew := dataDeploymentFromRow(change.NewRow) - depOld := dataDeploymentFromRow(change.OldRow) - - if depOld.BlobID != depNew.BlobID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID) - } - - if depOld.BlobResourceID != depNew.BlobResourceID { - updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID) - updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID) - } + confNew := configurationFromRow(change.NewRow) + confOld := configurationFromRow(change.OldRow) + updatedNewConfigs = append(updatedNewConfigs, &confNew) + updatedOldConfigs = append(updatedOldConfigs, &confOld) default: log.Errorf("unexpected operation: %s", change.Operation) } } } - - /* - for _, d := range deletedDeployments { - h.apiMan.addChangedDeployment(d.ID) - } - */ - - // insert - for i := range insertedDeployments { - go h.bundleMan.queueDownloadRequest(&insertedDeployments[i]) + // delete old configs from FS + if len(deletedConfigs)+len(updatedOldConfigs) > 0 { + log.Debugf("will delete %d old blobs", len(deletedConfigs)+len(updatedOldConfigs)) + //TODO delete blobs for deleted configs + blobIds := extractBlobsToDelete(append(deletedConfigs, updatedOldConfigs...)) + go h.bundleMan.deleteBlobs(blobIds) } - // update - for i := range updatedNewBlobs { - go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i])) + // download and expose new configs + if isConfigChanged { + h.dbMan.updateLSN(changes.LastSequence) + blobs := extractBlobsToDownload(append(insertedConfigs, updatedNewConfigs...)) + h.bundleMan.downloadBlobsWithCallback(blobs, h.apiMan.notifyNewChange) + } else if h.dbMan.getLSN() == InitLSN { + h.dbMan.updateLSN(changes.LastSequence) } - for i := range updatedOldBlobs { - go h.bundleMan.deleteBundleById(updatedOldBlobs[i]) - } - - // delete - if len(deletedDeployments) > 0 { - log.Debugf("will delete %d old bundles", len(deletedDeployments)) - //TODO delete bundles for deleted deployments - h.bundleMan.deleteBundlesFromDeployments(deletedDeployments) - } } -func dataDeploymentFromRow(row common.Row) (d DataDeployment) { +func extractBlobsToDownload(confs []*Configuration) (blobs []string) { + //TODO: do not include already-downloaded blobs + for _, conf := range confs { + if conf.BlobID != "" { + blobs = append(blobs, conf.BlobID) + } + if conf.BlobResourceID != "" { + blobs = append(blobs, conf.BlobResourceID) + } + } + return +} - row.Get("id", &d.ID) - row.Get("organization_id", &d.OrgID) - row.Get("environment_id", &d.EnvID) - row.Get("bean_blob_id", &d.BlobID) - row.Get("resource_blob_id", &d.BlobResourceID) - row.Get("type", &d.Type) - row.Get("name", &d.Name) - row.Get("revision", &d.Revision) - row.Get("path", &d.Path) - row.Get("created_at", &d.Created) - row.Get("created_by", &d.CreatedBy) - row.Get("updated_at", &d.Updated) - row.Get("updated_by", &d.UpdatedBy) +func extractBlobsToDelete(confs []*Configuration) (blobs []string) { + //TODO: do not include already-downloaded blobs + for _, conf := range confs { + if conf.BlobID != "" { + blobs = append(blobs, conf.BlobID) + } + if conf.BlobResourceID != "" { + blobs = append(blobs, conf.BlobResourceID) + } + } + return +} + +func configurationFromRow(row common.Row) (c Configuration) { + + row.Get("id", &c.ID) + row.Get("organization_id", &c.OrgID) + row.Get("environment_id", &c.EnvID) + row.Get("bean_blob_id", &c.BlobID) + row.Get("resource_blob_id", &c.BlobResourceID) + row.Get("type", &c.Type) + row.Get("name", &c.Name) + row.Get("revision", &c.Revision) + row.Get("path", &c.Path) + row.Get("created_at", &c.Created) + row.Get("created_by", &c.CreatedBy) + row.Get("updated_at", &c.Updated) + row.Get("updated_by", &c.UpdatedBy) return }
diff --git a/listener_test.go b/listener_test.go index 55a71c5..2971a08 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -22,7 +22,6 @@ . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "math/rand" - "reflect" "time" ) @@ -31,17 +30,19 @@ var dummyApiMan *dummyApiManager var dummyBundleMan *dummyBundleManager var testHandler *apigeeSyncHandler + var testCount int var _ = BeforeEach(func() { + testCount += 1 // stop handler created by initPlugin() eventHandler.stopListener(services) - dummyApiMan = &dummyApiManager{} + dummyApiMan = &dummyApiManager{ + notifyChan: make(chan bool, 1), + initCalled: make(chan bool), + } dummyDbMan = &dummyDbManager{} dummyBundleMan = &dummyBundleManager{ - requestChan: make(chan *DownloadRequest), - depChan: make(chan *DataDeployment), - delChan: make(chan *DataDeployment), - delBlobChan: make(chan string), + blobChan: make(chan string), } testHandler = &apigeeSyncHandler{ dbMan: dummyDbMan, @@ -73,11 +74,11 @@ SnapshotInfo: fmt.Sprint(rand.Uint32()), } - apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) for i := 0; i < len(unreadyBlobIds); i++ { - req := <-dummyBundleMan.requestChan - blobMap[req.blobId]++ + id := <-dummyBundleMan.blobChan + blobMap[id]++ } // verify all unready blobids are enqueued @@ -86,7 +87,7 @@ } }) - It("Snapshot events should set db version, and should only init API endpoint once", func() { + It("Snapshot events should set db version", func() { // emit snapshot for i := 0; i < 2+rand.Intn(5); i++ { @@ -97,11 +98,42 @@ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) Expect(dummyDbMan.version).Should(Equal(version)) } - - // verify init API called - Expect(dummyApiMan.initCalled).Should(BeTrue()) }) + It("Snapshot event should init API endpoint and notify long-polling", func() { + + // emit snapshot + version := fmt.Sprint(rand.Uint32()) + snapshot := &common.Snapshot{ + SnapshotInfo: version, + } + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + Expect(dummyDbMan.version).Should(Equal(version)) + Expect(<-dummyApiMan.initCalled).Should(BeTrue()) + Expect(<-dummyApiMan.notifyChan).Should(BeTrue()) + }) + + It("Should load LSN when apid starts", func() { + dummyDbMan.dbLSN = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // emit snapshot + version := fmt.Sprint(rand.Uint32()) + snapshot := &common.Snapshot{ + SnapshotInfo: version, + } + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN)) + }) + + It("Should store LSN when receiving snapshot at runtime", func() { + dummyDbMan.lsn = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // emit snapshot + version := fmt.Sprint(rand.Uint32()) + snapshot := &common.Snapshot{ + SnapshotInfo: version, + } + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN)) + }) }) Context("Change list", func() { @@ -109,7 +141,7 @@ It("Insert event should enqueue download requests for all inserted deployments", func() { // emit change event changes := make([]common.Change, 0) - deployments := make(map[string]DataDeployment) + blobs := make(map[string]int) for i := 0; i < 1+rand.Intn(10); i++ { dep := makeTestDeployment() change := common.Change{ @@ -118,25 +150,25 @@ NewRow: rowFromDeployment(dep), } changes = append(changes, change) - deployments[dep.ID] = *dep + blobs[dep.BlobID]++ + blobs[dep.BlobResourceID]++ } changeList := &common.ChangeList{ Changes: changes, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify - for i := 0; i < len(changes); i++ { - dep := <-dummyBundleMan.depChan - Expect(reflect.DeepEqual(deployments[dep.ID], *dep)).Should(BeTrue()) - delete(deployments, dep.ID) + for i := 0; i < 2*len(changes); i++ { + blobId := <-dummyBundleMan.blobChan + blobs[blobId]++ + Expect(blobs[blobId]).Should(Equal(2)) } - Expect(len(deployments)).Should(BeZero()) }) - It("Delete event should deliver to the bundle manager", func() { + XIt("Delete event should deliver to the bundle manager", func() { // emit change event changes := make([]common.Change, 0) deployments := make(map[string]bool) @@ -155,169 +187,137 @@ Changes: changes, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify for i := 0; i < len(changes); i++ { - dep := <-dummyBundleMan.delChan - Expect(deployments[dep.ID]).Should(BeTrue()) - delete(deployments, dep.ID) } Expect(len(deployments)).Should(BeZero()) }) - It("Update event should enqueue download requests and delete old blobs", func() { + It("Update event should enqueue download requests", func() { changes := make([]common.Change, 0) - blobIdNew := make(map[string]int) - blobIdOld := make(map[string]int) + blobsNew := make(map[string]int) for i := 0; i < 1+rand.Intn(10); i++ { - depNew := makeTestDeployment() - depNew.BlobID = util.GenerateUUID() - depNew.BlobResourceID = util.GenerateUUID() + confNew := makeTestDeployment() + confNew.BlobID = util.GenerateUUID() + confNew.BlobResourceID = util.GenerateUUID() - depOld := makeTestDeployment() - depOld.BlobID = util.GenerateUUID() - depOld.BlobResourceID = util.GenerateUUID() + confOld := makeTestDeployment() + confOld.BlobID = util.GenerateUUID() + confOld.BlobResourceID = util.GenerateUUID() change := common.Change{ Operation: common.Update, Table: CONFIG_METADATA_TABLE, - NewRow: rowFromDeployment(depNew), - OldRow: rowFromDeployment(depOld), + NewRow: rowFromDeployment(confNew), + OldRow: rowFromDeployment(confOld), } changes = append(changes, change) - blobIdNew[depNew.BlobID]++ - blobIdNew[depNew.BlobResourceID]++ - blobIdOld[depOld.BlobID]++ - blobIdOld[depOld.BlobResourceID]++ + blobsNew[confNew.BlobID]++ + blobsNew[confNew.BlobResourceID]++ } + testLSN := "1.1.1" // emit change event changeList := &common.ChangeList{ - Changes: changes, + Changes: changes, + LastSequence: testLSN, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify - for i := 0; i < len(blobIdNew); i++ { - req := <-dummyBundleMan.requestChan - blobIdNew[req.blobId]++ - Expect(blobIdNew[req.blobId]).Should(Equal(2)) - } - for i := 0; i < len(blobIdOld); i++ { - blobId := <-dummyBundleMan.delBlobChan - blobIdOld[blobId]++ - Expect(blobIdOld[blobId]).Should(Equal(2)) + for i := 0; i < 2*len(changes); i++ { + blobId := <-dummyBundleMan.blobChan + blobsNew[blobId]++ + Expect(blobsNew[blobId]).Should(Equal(2)) } }) + }) - It("Update event should only download/delete changed blobs", func() { + Context("LSN", func() { + + var _ = BeforeEach(func() { + dummyDbMan.lsn = "0.0.1" + }) + + It("changelist with CONFIG_METADATA_TABLE should update apidLSN", func() { + // emit change event changes := make([]common.Change, 0) - blobIdChangedNew := make(map[string]int) - blobIdChangedOld := make(map[string]int) - + deployments := make(map[string]Configuration) + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) for i := 0; i < 1+rand.Intn(10); i++ { - depNew := makeTestDeployment() - depNew.BlobID = util.GenerateUUID() - depNew.BlobResourceID = util.GenerateUUID() - - depOld := makeTestDeployment() - - if rand.Intn(2) == 0 { - // blob id changed - depOld.BlobID = util.GenerateUUID() - blobIdChangedNew[depNew.BlobID]++ - blobIdChangedOld[depOld.BlobID]++ - } else { - // blob id unchanged - depOld.BlobID = depNew.BlobID - } - - if rand.Intn(2) == 0 { - // blob id changed - depOld.BlobResourceID = util.GenerateUUID() - blobIdChangedNew[depNew.BlobResourceID]++ - blobIdChangedOld[depOld.BlobResourceID]++ - } else { - // blob id unchanged - depOld.BlobResourceID = depNew.BlobResourceID - } - + dep := makeTestDeployment() change := common.Change{ - Operation: common.Update, + Operation: common.Insert, Table: CONFIG_METADATA_TABLE, - NewRow: rowFromDeployment(depNew), - OldRow: rowFromDeployment(depOld), + NewRow: rowFromDeployment(dep), } changes = append(changes, change) + deployments[dep.ID] = *dep } + changeList := &common.ChangeList{ + Changes: changes, + LastSequence: testLSN, + } + + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + for i := 0; i < 2*len(changes); i++ { + <-dummyBundleMan.blobChan + } + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) + + }) + + It("changelist without CONFIG_METADATA_TABLE shouldn't update apidLSN", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + dummyDbMan.lsn = testLSN + // emit change event + changes := make([]common.Change, 0) + deployments := make(map[string]Configuration) + for i := 0; i < 1+rand.Intn(10); i++ { + dep := makeTestDeployment() + change := common.Change{ + Operation: common.Insert, + Table: "somewhat-table", + NewRow: rowFromDeployment(dep), + } + changes = append(changes, change) + deployments[dep.ID] = *dep + } + + changeList := &common.ChangeList{ + Changes: changes, + LastSequence: "aaa.aaa.aaa", + } + + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) + + }) + + It("changelist should always update apidLSN if it has init value", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + dummyDbMan.lsn = InitLSN // emit change event changeList := &common.ChangeList{ - Changes: changes, + Changes: nil, + LastSequence: testLSN, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) - // verify - for i := 0; i < len(blobIdChangedNew); i++ { - req := <-dummyBundleMan.requestChan - blobIdChangedNew[req.blobId]++ - Expect(blobIdChangedNew[req.blobId]).Should(Equal(2)) - } - for i := 0; i < len(blobIdChangedOld); i++ { - blobId := <-dummyBundleMan.delBlobChan - blobIdChangedOld[blobId]++ - Expect(blobIdChangedOld[blobId]).Should(Equal(2)) - } }) }) }) -type dummyBundleManager struct { - requestChan chan *DownloadRequest - depChan chan *DataDeployment - delChan chan *DataDeployment - delBlobChan chan string -} - -func (bm *dummyBundleManager) initializeBundleDownloading() { - -} - -func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) { - bm.depChan <- dep -} - -func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) { - bm.requestChan <- req -} - -func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest { - return &DownloadRequest{ - blobId: blobId, - } -} - -func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) { - for i := range deployments { - bm.delChan <- &deployments[i] - } -} - -func (bm *dummyBundleManager) deleteBundleById(blobId string) { - bm.delBlobChan <- blobId -} - -func (bm *dummyBundleManager) Close() { - -} - -func rowFromDeployment(dep *DataDeployment) common.Row { +func rowFromDeployment(dep *Configuration) common.Row { row := common.Row{} row["id"] = &common.ColumnVal{Value: dep.ID} row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
diff --git a/mock_test.go b/mock_test.go new file mode 100644 index 0000000..f8f804a --- /dev/null +++ b/mock_test.go
@@ -0,0 +1,192 @@ +package apiGatewayConfDeploy + +import ( + "bytes" + "encoding/json" + "github.com/gorilla/mux" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "io" + "net/http" + "os" + "strings" + "sync/atomic" + "time" +) + +type dummyDbManager struct { + unreadyBlobIds []string + readyDeployments []Configuration + localFSLocation string + fileResponse chan string + version string + configurations map[string]*Configuration + lsn string + dbLSN string + err error +} + +func (d *dummyDbManager) setDbVersion(version string) { + d.version = version +} + +func (d *dummyDbManager) initDb() error { + return nil +} + +func (d *dummyDbManager) getUnreadyBlobs() ([]string, error) { + return d.unreadyBlobIds, nil +} + +func (d *dummyDbManager) getAllConfigurations(typeFilter string) ([]Configuration, error) { + if typeFilter == "" { + return d.readyDeployments, nil + } + return []Configuration{*(d.configurations[typeFilter])}, nil +} + +func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error { + file, err := os.Open(localFsLocation) + if err != nil { + return err + } + buff := make([]byte, 36) + _, err = file.Read(buff) + if err != nil { + return err + } + go func(buff []byte) { + d.fileResponse <- string(buff) + }(buff) + + return nil +} + +func (d *dummyDbManager) getLocalFSLocation(string) (string, error) { + return d.localFSLocation, nil +} + +func (d *dummyDbManager) getConfigById(id string) (*Configuration, error) { + return d.configurations[id], d.err +} +func (d *dummyDbManager) getLSN() string { + return d.lsn +} + +func (d *dummyDbManager) updateLSN(LSN string) error { + d.lsn = LSN + d.dbLSN = LSN + return nil +} + +func (d *dummyDbManager) loadLsnFromDb() error { + d.lsn = d.dbLSN + return nil +} + +type dummyApiManager struct { + initCalled chan bool + notifyChan chan bool +} + +func (a *dummyApiManager) InitAPI() { + go func() { + a.initCalled <- true + }() +} + +func (a *dummyApiManager) notifyNewChange() { + a.notifyChan <- true +} + +type dummyBundleManager struct { + blobChan chan string +} + +func (bm *dummyBundleManager) initializeBundleDownloading() { + +} + +func (bm *dummyBundleManager) downloadBlobsWithCallback(blobs []string, callback func()) { + go func() { + for _, id := range blobs { + bm.blobChan <- id + } + }() + go callback() +} + +func (bm *dummyBundleManager) makeDownloadRequest(blobId string, bunchRequest *BunchDownloadRequest) *DownloadRequest { + return &DownloadRequest{ + blobId: blobId, + bunchRequest: bunchRequest, + } +} + +func (bm *dummyBundleManager) deleteBlobs(blobIds []string) { + +} + +func (bm *dummyBundleManager) Close() { + +} + +type dummyBlobServer struct { + serverEndpoint string + signedEndpoint string + signedTimeout *int32 + blobTimeout *int32 + resetTimeout bool +} + +func (b *dummyBlobServer) start() { + services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET") + services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET") +} + +// send a dummy uri as response +func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + if atomic.LoadInt32(b.signedTimeout) == int32(1) { + if b.resetTimeout { + atomic.StoreInt32(b.signedTimeout, 0) + } + time.Sleep(time.Second) + } + vars := mux.Vars(r) + blobId := vars["blobId"] + + uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1) + log.Debug("dummyBlobServer returnSigned: " + uriString) + + res := blobServerResponse{ + Id: blobId, + Kind: "Blob", + Self: r.RequestURI, + SignedUrl: uriString, + SignedUrlExpiryTimestamp: time.Now().Add(3 * time.Hour).Format(time.RFC3339), + } + + resBytes, err := json.Marshal(res) + Expect(err).Should(Succeed()) + _, err = io.Copy(w, bytes.NewReader(resBytes)) + Expect(err).Should(Succeed()) + w.Header().Set("Content-Type", headerSteam) +} + +// send blobId back as response +func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) { + defer GinkgoRecover() + if atomic.LoadInt32(b.blobTimeout) == int32(1) { + if b.resetTimeout { + atomic.StoreInt32(b.blobTimeout, 0) + } + time.Sleep(time.Second) + } + vars := mux.Vars(r) + blobId := vars["blobId"] + log.Debug("dummyBlobServer returnBlob id=" + blobId) + _, err := io.Copy(w, bytes.NewReader([]byte(blobId))) + Expect(err).Should(Succeed()) + w.Header().Set("Content-Type", headerSteam) +}