WIP: [ISSUE-66918282] support long poll for "/configurations" (#23)
* [ISSUE-66918282] support long poll for "/configurations"
* [ISSUE-66918282] add tests
* [ISSUE-66918282] fix bugs for long-polling
* [ISSUE-66918282] Read DB only once for all subscribers
* [ISSUE-66918282] use long-poll in apid-core, improve style
* [ISSUE-66918282]
* [ISSUE-66918282] update tests, improve style
* [ISSUE-66918282] refactor tests
* [ISSUE-66918282] update tests
* [ISSUE-66918282] update README and swagger
diff --git a/README.md b/README.md
index dd683e9..f9c5381 100644
--- a/README.md
+++ b/README.md
@@ -11,5 +11,15 @@
## Functional description
-see the file [swagger.yaml](swagger.yaml).
+###Configurations
+* Gateway cant call "/configurations" to fetch configurations.
+* "type" filter is supported.
+* Long-polling is supported.
+* A configuration can be fetched by id "/configurations/{configId}"
+
+###Blobs
+* A blob can be downloaded by id "/blobs/{blobId}"
+
+
+For details, check the file [apidGatewayConfDeploy-api.yaml](swagger.yaml).
diff --git a/api.go b/api.go
index 4ffc12e..5f5ab0e 100644
--- a/api.go
+++ b/api.go
@@ -17,14 +17,16 @@
"bytes"
"database/sql"
"encoding/json"
+ "errors"
"fmt"
+ "github.com/apid/apid-core/util"
+ "github.com/apigee-labs/transicator/common"
"github.com/gorilla/mux"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
- "sync/atomic"
"time"
)
@@ -39,10 +41,10 @@
)
const (
- deploymentsEndpoint = "/configurations"
- blobEndpointPath = "/blobs"
- blobEndpoint = blobEndpointPath + "/{blobId}"
- deploymentIdEndpoint = deploymentsEndpoint + "/{configId}"
+ configEndpoint = "/configurations"
+ blobEndpointPath = "/blobs"
+ blobEndpoint = blobEndpointPath + "/{blobId}"
+ configIdEndpoint = configEndpoint + "/{configId}"
)
const (
@@ -64,11 +66,19 @@
)
const (
- headerSteam = "application/octet-stream"
+ headerSteam = "application/octet-stream"
+ headerJson = "application/json"
+ apidConfigIndexPar = "apid-config-index"
+ apidConfigIndexHeader = "x-apid-config-index"
+)
+
+var (
+ ErrNoLSN = errors.New("No last sequence in DB")
+ ErrInvalidLSN = errors.New(apidConfigIndexPar + " is invalid")
)
type deploymentsResult struct {
- deployments []DataDeployment
+ deployments []Configuration
err error
eTag string
}
@@ -78,7 +88,7 @@
Reason string `json:"reason"`
}
-type ApiDeploymentDetails struct {
+type ApiConfigurationDetails struct {
Self string `json:"self"`
Name string `json:"name"`
Type string `json:"type"`
@@ -92,44 +102,60 @@
Updated string `json:"updated"`
}
-type ApiDeploymentResponse struct {
- Kind string `json:"kind"`
- Self string `json:"self"`
- ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"`
+type ApiConfigurationResponse struct {
+ Kind string `json:"kind"`
+ Self string `json:"self"`
+ ApiConfigurationsResponse []ApiConfigurationDetails `json:"contents"`
}
-//TODO add support for block and subscriber
+type confChangeNotification struct {
+ LSN string
+ confs []Configuration
+ err error
+}
+
type apiManagerInterface interface {
+ // an idempotent method to initialize api endpoints
InitAPI()
- //addChangedDeployment(string)
- //distributeEvents()
+ notifyNewChange()
}
type apiManager struct {
- dbMan dbManagerInterface
- deploymentsEndpoint string
- blobEndpoint string
- deploymentIdEndpoint string
- eTag int64
- deploymentsChanged chan interface{}
- addSubscriber chan chan deploymentsResult
- removeSubscriber chan chan deploymentsResult
- apiInitialized bool
+ dbMan dbManagerInterface
+ configurationEndpoint string
+ blobEndpoint string
+ configurationIdEndpoint string
+ addSubscriber chan chan interface{}
+ newChangeListChan chan interface{}
+ apiInitialized bool
}
func (a *apiManager) InitAPI() {
if a.apiInitialized {
return
}
- services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET")
+ services.API().HandleFunc(a.configurationEndpoint, a.apiGetCurrentConfigs).Methods("GET")
services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET")
- services.API().HandleFunc(a.deploymentIdEndpoint, a.apiHandleConfigId).Methods("GET")
+ services.API().HandleFunc(a.configurationIdEndpoint, a.apiHandleConfigId).Methods("GET")
+ a.initDistributeEvents()
a.apiInitialized = true
log.Debug("API endpoints initialized")
}
-func (a *apiManager) addChangedDeployment(id string) {
- a.deploymentsChanged <- id
+func (a *apiManager) initDistributeEvents() {
+ go util.DistributeEvents(a.newChangeListChan, a.addSubscriber)
+}
+
+func (a *apiManager) notifyNewChange() {
+ confs, err := a.dbMan.getAllConfigurations("")
+ if err != nil {
+ log.Errorf("Database error in getReadyConfigurations: %v", err)
+ }
+ a.newChangeListChan <- &confChangeNotification{
+ LSN: a.dbMan.getLSN(),
+ confs: confs,
+ err: err,
+ }
}
func (a *apiManager) writeError(w http.ResponseWriter, status int, code int, reason string) {
@@ -151,70 +177,6 @@
a.writeError(w, http.StatusInternalServerError, API_ERR_INTERNAL, err)
}
-func (a *apiManager) debounce(in chan interface{}, out chan []interface{}, window time.Duration) {
- send := func(toSend []interface{}) {
- if toSend != nil {
- log.Debugf("debouncer sending: %v", toSend)
- out <- toSend
- }
- }
- var toSend []interface{}
- for {
- select {
- case incoming, ok := <-in:
- if ok {
- log.Debugf("debouncing %v", incoming)
- toSend = append(toSend, incoming)
- } else {
- send(toSend)
- log.Debugf("closing debouncer")
- close(out)
- return
- }
- case <-time.After(window):
- send(toSend)
- toSend = nil
- }
- }
-}
-
-//TODO get notified when deployments ready
-/*
-func (a *apiManager) distributeEvents() {
- subscribers := make(map[chan deploymentsResult]bool)
- deliverDeployments := make(chan []interface{}, 1)
-
- go a.debounce(a.deploymentsChanged, deliverDeployments, debounceDuration)
-
- for {
- select {
- case _, ok := <-deliverDeployments:
- if !ok {
- return // todo: using this?
- }
- subs := subscribers
- subscribers = make(map[chan deploymentsResult]bool)
- go func() {
- eTag := a.incrementETag()
- deployments, err := a.dbMan.getUnreadyDeployments()
- log.Debugf("delivering deployments to %d subscribers", len(subs))
- for subscriber := range subs {
- log.Debugf("delivering to: %v", subscriber)
- subscriber <- deploymentsResult{deployments, err, eTag}
- }
- }()
- case subscriber := <-a.addSubscriber:
- log.Debugf("Add subscriber: %v", subscriber)
- subscribers[subscriber] = true
- case subscriber := <-a.removeSubscriber:
- log.Debugf("Remove subscriber: %v", subscriber)
- delete(subscribers, subscriber)
- }
- }
-}
-*/
-
-// TODO use If-None-Match and ETag
func (a *apiManager) apiReturnBlobData(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
@@ -250,8 +212,8 @@
}
return
}
- configDetail := ApiDeploymentDetails{
- Self: getHttpHost() + a.deploymentsEndpoint + "/" + config.ID,
+ configDetail := ApiConfigurationDetails{
+ Self: getHttpHost() + a.configurationEndpoint + "/" + config.ID,
Name: config.Name,
Type: config.Type,
Revision: config.Revision,
@@ -271,99 +233,99 @@
return
}
log.Debugf("sending configuration %s", b)
+ w.Header().Set("Content-Type", headerJson)
w.Write(b)
}
-func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) {
-
- // If returning without a bundle (immediately or after timeout), status = 404
- // If returning If-None-Match value is equal to current deployment, status = 304
- // If returning a new value, status = 200
-
- // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then
- // block for up to the specified number of seconds until a new deployment becomes available.
- b := r.URL.Query().Get("block")
+// If not long-polling, return configurations, status = 200
+// If "apid-config-index" is given in request parameters, return immediately with status = 200/304
+// If both "block" and "apid-config-index" are given:
+// if apid's LSN > apid-config-index in header, return immediately with status = 200
+// if apid's LSN <= apid-config-index, long polling for timeout=block secs
+func (a *apiManager) apiGetCurrentConfigs(w http.ResponseWriter, r *http.Request) {
+ blockSec := r.URL.Query().Get("block")
typeFilter := r.URL.Query().Get("type")
+ headerLSN := r.URL.Query().Get(apidConfigIndexPar)
var timeout int
- if b != "" {
- var err error
- timeout, err = strconv.Atoi(b)
- if err != nil {
+ var err error
+ if blockSec != "" {
+ timeout, err = strconv.Atoi(blockSec)
+ if err != nil || timeout < 0 {
a.writeError(w, http.StatusBadRequest, API_ERR_BAD_BLOCK, "bad block value, must be number of seconds")
return
}
}
- log.Debugf("api timeout: %d", timeout)
+ log.Debugf("/configurations long-poll timeout: %d", timeout)
- // If If-None-Match header matches the ETag of current bundle list AND if the request does NOT have a 'block'
- // query param > 0, the server returns a 304 Not Modified response indicating that the client already has the
- // most recent bundle list.
- ifNoneMatch := r.Header.Get("If-None-Match")
- log.Debugf("if-none-match: %s", ifNoneMatch)
+ log.Debugf("Long-Poll-Index: %s", headerLSN)
- // send unmodified if matches prior eTag and no timeout
- eTag := a.getETag()
- if eTag == ifNoneMatch && timeout == 0 {
- w.WriteHeader(http.StatusNotModified)
+ // if filter by "type"
+ if typeFilter != "" {
+ a.sendReadyConfigurations(typeFilter, w, "")
return
}
- // send results if different eTag
- if eTag != ifNoneMatch {
- a.sendReadyDeployments(typeFilter, w)
- return
- }
-
- // otherwise, subscribe to any new deployment changes
- var newDeploymentsChannel chan deploymentsResult
- if timeout > 0 && ifNoneMatch != "" {
- //TODO handle block
- //newDeploymentsChannel = make(chan deploymentsResult, 1)
- //a.addSubscriber <- newDeploymentsChannel
- }
-
- log.Debug("Blocking request... Waiting for new Deployments.")
-
- select {
- case result := <-newDeploymentsChannel:
- if result.err != nil {
- a.writeInternalError(w, "Database error")
- } else {
- a.sendDeployments(w, result.deployments, result.eTag, typeFilter)
+ // if no filter, check for long polling
+ cmpRes, apidLSN, err := a.compareLSN(headerLSN)
+ switch {
+ case err != nil:
+ if err == ErrInvalidLSN {
+ a.writeError(w, http.StatusBadRequest, http.StatusBadRequest, err.Error())
+ return
}
-
- case <-time.After(time.Duration(timeout) * time.Second):
- a.removeSubscriber <- newDeploymentsChannel
- log.Debug("Blocking deployment request timed out.")
- if ifNoneMatch != "" {
+ log.Errorf("Error in compareLSN: %v", err)
+ a.writeInternalError(w, err.Error())
+ return
+ case cmpRes <= 0: //APID_LSN <= Header_LSN
+ if timeout == 0 { // no long polling
w.WriteHeader(http.StatusNotModified)
- } else {
- a.sendReadyDeployments(typeFilter, w)
+ } else { // long polling
+ util.LongPolling(w, time.Duration(timeout)*time.Second, a.addSubscriber, a.LongPollSuccessHandler, a.LongPollTimeoutHandler)
}
+ return
+ case cmpRes > 0: //APID_LSN > Header_LSN
+ a.sendReadyConfigurations("", w, apidLSN)
+ return
}
}
-func (a *apiManager) sendReadyDeployments(typeFilter string, w http.ResponseWriter) {
- eTag := a.getETag()
- deployments, err := a.dbMan.getReadyDeployments(typeFilter)
+func (a *apiManager) LongPollSuccessHandler(c interface{}, w http.ResponseWriter) {
+ // send configs and LSN
+ confChange, ok := c.(*confChangeNotification)
+ if !ok || confChange.err != nil {
+ log.Errorf("Wrong confChangeNotification: %v, %v", ok, confChange)
+ a.writeInternalError(w, "Error getting configurations with long-polling")
+ return
+ }
+ a.sendDeployments(w, confChange.confs, confChange.LSN, "")
+}
+
+func (a *apiManager) LongPollTimeoutHandler(w http.ResponseWriter) {
+ log.Debug("long-polling configuration request timed out.")
+ w.WriteHeader(http.StatusNotModified)
+}
+
+func (a *apiManager) sendReadyConfigurations(typeFilter string, w http.ResponseWriter, apidLSN string) {
+ configurations, err := a.dbMan.getAllConfigurations(typeFilter)
if err != nil {
+ log.Errorf("Database error: %v", err)
a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error()))
return
}
- a.sendDeployments(w, deployments, eTag, typeFilter)
+ a.sendDeployments(w, configurations, apidLSN, typeFilter)
}
-func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []DataDeployment, eTag string, typeFilter string) {
+func (a *apiManager) sendDeployments(w http.ResponseWriter, dataConfs []Configuration, apidLSN string, typeFilter string) {
- apiDeps := ApiDeploymentResponse{}
- apiDepDetails := make([]ApiDeploymentDetails, 0)
+ apiConfs := ApiConfigurationResponse{}
+ apiConfDetails := make([]ApiConfigurationDetails, 0)
- apiDeps.Kind = kindCollection
- apiDeps.Self = getHttpHost() + a.deploymentsEndpoint
+ apiConfs.Kind = kindCollection
+ apiConfs.Self = getHttpHost() + a.configurationEndpoint
- for _, d := range dataDeps {
- apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{
- Self: apiDeps.Self + "/" + d.ID,
+ for _, d := range dataConfs {
+ apiConfDetails = append(apiConfDetails, ApiConfigurationDetails{
+ Self: apiConfs.Self + "/" + d.ID,
Name: d.Name,
Type: d.Type,
Revision: d.Revision,
@@ -376,33 +338,49 @@
Updated: convertTime(d.Updated),
})
}
- apiDeps.ApiDeploymentsResponse = apiDepDetails
+ apiConfs.ApiConfigurationsResponse = apiConfDetails
if typeFilter != "" {
- apiDeps.Self += "?type=" + typeFilter
+ apiConfs.Self += "?type=" + typeFilter
}
- b, err := json.Marshal(apiDeps)
+ b, err := json.Marshal(apiConfs)
if err != nil {
log.Errorf("unable to marshal deployments: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
- log.Debugf("sending deployments %s: %s", eTag, b)
- w.Header().Set("ETag", eTag)
+ if apidLSN != "" {
+ w.Header().Set(apidConfigIndexHeader, apidLSN)
+ }
+ w.Header().Set("Content-Type", headerJson)
+ log.Debugf("sending deployments %s", apidLSN)
w.Write(b)
}
-// call whenever the list of deployments changes
-func (a *apiManager) incrementETag() string {
- e := atomic.AddInt64(&a.eTag, 1)
- return strconv.FormatInt(e, 10)
-}
+func (a *apiManager) compareLSN(headerLSN string) (res int, apidLSN string, err error) {
+ apidLSN = a.dbMan.getLSN()
+ log.Debugf("apidLSN: %v", apidLSN)
-func (a *apiManager) getETag() string {
- e := atomic.LoadInt64(&a.eTag)
- return strconv.FormatInt(e, 10)
+ // if no Long Poll Index
+ if headerLSN == "" {
+ return 1, apidLSN, nil
+ }
+
+ headerSeq, err := common.ParseSequence(headerLSN)
+ if err != nil {
+ log.Debugf("Error when Parse headerLSN Sequence: %v", err)
+ return 0, "", ErrInvalidLSN
+ }
+
+ apidSeq, err := common.ParseSequence(apidLSN)
+ if err != nil {
+ log.Errorf("Error when Parse apidLSN Sequence: %v", err)
+ return 0, "", err
+ }
+
+ return apidSeq.Compare(headerSeq), apidLSN, nil
}
// escape the blobId into url
@@ -430,7 +408,7 @@
func getHttpHost() string {
- configuredEndpoint := config.GetString(configBundleBlobDownloadEndpoint)
+ configuredEndpoint := config.GetString(configBlobDownloadEndpoint)
if configuredEndpoint != "" {
return configuredEndpoint
}
diff --git a/api_test.go b/api_test.go
index 84c1cb5..630d993 100644
--- a/api_test.go
+++ b/api_test.go
@@ -24,7 +24,6 @@
mathrand "math/rand"
"net/http"
"net/url"
- "os"
"strconv"
"strings"
"time"
@@ -42,16 +41,16 @@
var _ = BeforeEach(func() {
testCount += 1
- dummyDbMan = &dummyDbManager{}
+ dummyDbMan = &dummyDbManager{
+ lsn: "0.1.1",
+ }
testApiMan = &apiManager{
- dbMan: dummyDbMan,
- deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount),
- blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}",
- deploymentIdEndpoint: deploymentsEndpoint + strconv.Itoa(testCount) + "/{configId}",
- eTag: int64(testCount * 10),
- deploymentsChanged: make(chan interface{}, 5),
- addSubscriber: make(chan chan deploymentsResult),
- removeSubscriber: make(chan chan deploymentsResult),
+ dbMan: dummyDbMan,
+ configurationEndpoint: configEndpoint + strconv.Itoa(testCount),
+ blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}",
+ configurationIdEndpoint: configEndpoint + strconv.Itoa(testCount) + "/{configId}",
+ newChangeListChan: make(chan interface{}, 5),
+ addSubscriber: make(chan chan interface{}),
}
testApiMan.InitAPI()
time.Sleep(100 * time.Millisecond)
@@ -65,7 +64,7 @@
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
// http get
res, err := http.Get(uri.String())
@@ -74,16 +73,16 @@
Expect(res.StatusCode).Should(Equal(http.StatusOK))
// parse response
- var depRes ApiDeploymentResponse
+ var depRes ApiConfigurationResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
Expect(err).Should(Succeed())
// verify response
- Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0))
+ Expect(len(depRes.ApiConfigurationsResponse)).To(Equal(0))
Expect(depRes.Kind).Should(Equal(kindCollection))
- Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
+ Expect(depRes.Self).Should(Equal(apiTestUrl + configEndpoint + strconv.Itoa(testCount)))
})
@@ -91,7 +90,7 @@
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
// set test data
details := setTestDeployments(dummyDbMan, uri.String())
@@ -103,7 +102,7 @@
Expect(res.StatusCode).Should(Equal(http.StatusOK))
// parse response
- var depRes ApiDeploymentResponse
+ var depRes ApiConfigurationResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
@@ -112,7 +111,7 @@
// verify response
Expect(depRes.Kind).Should(Equal(kindCollection))
Expect(depRes.Self).Should(Equal(uri.String()))
- Expect(depRes.ApiDeploymentsResponse).Should(Equal(details))
+ Expect(depRes.ApiConfigurationsResponse).Should(Equal(details))
})
@@ -121,12 +120,15 @@
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
- uri.RawQuery = "type=" + typeFilter
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
+
+ query := uri.Query()
+ query.Add("type", typeFilter)
+ uri.RawQuery = query.Encode()
// set test data
dep := makeTestDeployment()
- dummyDbMan.configurations = make(map[string]*DataDeployment)
+ dummyDbMan.configurations = make(map[string]*Configuration)
dummyDbMan.configurations[typeFilter] = dep
detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0])
@@ -137,7 +139,7 @@
Expect(res.StatusCode).Should(Equal(http.StatusOK))
// parse response
- var depRes ApiDeploymentResponse
+ var depRes ApiConfigurationResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
@@ -146,32 +148,73 @@
// verify response
Expect(depRes.Kind).Should(Equal(kindCollection))
Expect(depRes.Self).Should(Equal(uri.String()))
- Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail}))
+ Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail}))
})
- It("should get 304 for no change", func() {
-
+ It("should not long poll if using filter", func() {
+ typeFilter := "ORGANIZATION"
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
+ query := uri.Query()
+ query.Add("type", typeFilter)
+ query.Add("block", "3")
+ query.Add(apidConfigIndexPar, dummyDbMan.lsn)
+ uri.RawQuery = query.Encode()
// set test data
- setTestDeployments(dummyDbMan, uri.String())
+ dep := makeTestDeployment()
+
+ dummyDbMan.configurations = make(map[string]*Configuration)
+ dummyDbMan.configurations[typeFilter] = dep
+ detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0])
// http get
res, err := http.Get(uri.String())
Expect(err).Should(Succeed())
defer res.Body.Close()
Expect(res.StatusCode).Should(Equal(http.StatusOK))
- etag := res.Header.Get("etag")
- Expect(etag).ShouldNot(BeEmpty())
+
+ // parse response
+ var depRes ApiConfigurationResponse
+ body, err := ioutil.ReadAll(res.Body)
+ Expect(err).Should(Succeed())
+ err = json.Unmarshal(body, &depRes)
+ Expect(err).Should(Succeed())
+
+ // verify response
+ Expect(depRes.Kind).Should(Equal(kindCollection))
+ Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0] + "?type=" + typeFilter))
+ Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail}))
+
+ }, 1)
+
+ It("should get 304 for no change", func() {
+
+ // setup http client
+ uri, err := url.Parse(apiTestUrl)
+ Expect(err).Should(Succeed())
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
+
+ // set test data
+ setTestDeployments(dummyDbMan, uri.String())
+ // http get
+ res, err := http.Get(uri.String())
+ Expect(err).Should(Succeed())
+ defer res.Body.Close()
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+ lsn := res.Header.Get(apidConfigIndexHeader)
+ Expect(lsn).ShouldNot(BeEmpty())
// send second request
+ query := uri.Query()
+ query.Add(apidConfigIndexPar, lsn)
+ uri.RawQuery = query.Encode()
+ log.Debug(uri.String())
req, err := http.NewRequest("GET", uri.String(), nil)
req.Header.Add("Content-Type", "application/json")
- req.Header.Add("If-None-Match", etag)
// get response
res, err = http.DefaultClient.Do(req)
@@ -181,42 +224,111 @@
})
// block is not enabled now
- XIt("should get empty set after blocking if no deployments", func() {
+ It("should do long-polling if Gateway_LSN>=APID_LSN, should get 304 for timeout", func() {
start := time.Now()
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
query := uri.Query()
query.Add("block", "1")
+ query.Add(apidConfigIndexPar, "1.0.0")
uri.RawQuery = query.Encode()
// http get
res, err := http.Get(uri.String())
Expect(err).Should(Succeed())
defer res.Body.Close()
- Expect(res.StatusCode).Should(Equal(http.StatusOK))
+ Expect(res.StatusCode).Should(Equal(http.StatusNotModified))
//verify blocking time
blockingTime := time.Since(start)
- log.Warnf("time used: %v", blockingTime.Seconds())
Expect(blockingTime.Seconds() > 0.9).Should(BeTrue())
+ }, 2)
+
+ It("should do long-polling if Gateway_LSN>=APID_LSN, should get 200 if not timeout", func() {
+
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ // setup http client
+ uri, err := url.Parse(apiTestUrl)
+ Expect(err).Should(Succeed())
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
+ query := uri.Query()
+ query.Add("block", "2")
+ query.Add(apidConfigIndexPar, "1.0.0")
+ uri.RawQuery = query.Encode()
+ // set test data
+ details := setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0])
+
+ // notify change
+ go func() {
+ time.Sleep(time.Second)
+ dummyDbMan.lsn = testLSN
+ testApiMan.notifyNewChange()
+ }()
+
+ // http get
+ res, err := http.Get(uri.String())
+ Expect(err).Should(Succeed())
+ defer res.Body.Close()
+ Expect(res.StatusCode).Should(Equal(http.StatusOK))
+ Expect(res.Header.Get(apidConfigIndexHeader)).Should(Equal(testLSN))
// parse response
- var depRes ApiDeploymentResponse
+ var depRes ApiConfigurationResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
Expect(err).Should(Succeed())
// verify response
- Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0))
Expect(depRes.Kind).Should(Equal(kindCollection))
- Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount)))
+ Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0]))
+ Expect(depRes.ApiConfigurationsResponse).Should(Equal(details))
+ }, 3)
- }, 2)
+ It("should support long-polling for multiple subscribers", func() {
+
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ // setup http client
+ uri, err := url.Parse(apiTestUrl)
+ Expect(err).Should(Succeed())
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
+ query := uri.Query()
+ query.Add("block", "3")
+ query.Add(apidConfigIndexPar, dummyDbMan.lsn)
+ uri.RawQuery = query.Encode()
+
+ // set test data
+ setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0])
+
+ // http get
+ count := mathrand.Intn(20) + 5
+ finishChan := make(chan int)
+ for i := 0; i < count; i++ {
+ go func() {
+ defer GinkgoRecover()
+ res, err := http.Get(uri.String())
+ Expect(err).Should(Succeed())
+ defer res.Body.Close()
+ finishChan <- res.StatusCode
+ }()
+ }
+
+ // notify change
+ go func() {
+ time.Sleep(1500 * time.Millisecond)
+ dummyDbMan.lsn = testLSN
+ testApiMan.notifyNewChange()
+ }()
+
+ for i := 0; i < count; i++ {
+ Expect(<-finishChan).Should(Equal(http.StatusOK))
+ }
+
+ }, 5)
It("should get iso8601 time", func() {
testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00", "2017-06-22 16:41:02.334"}
@@ -225,7 +337,7 @@
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount)
+ uri.Path = configEndpoint + strconv.Itoa(testCount)
for i, t := range testTimes {
log.Debug("insert deployment with timestamp: " + t)
@@ -233,7 +345,7 @@
dep := makeTestDeployment()
dep.Created = t
dep.Updated = t
- dummyDbMan.readyDeployments = []DataDeployment{*dep}
+ dummyDbMan.readyDeployments = []Configuration{*dep}
detail := makeExpectedDetail(dep, uri.String())
detail.Created = isoTime[i]
detail.Updated = isoTime[i]
@@ -243,41 +355,17 @@
defer res.Body.Close()
Expect(res.StatusCode).Should(Equal(http.StatusOK))
// parse response
- var depRes ApiDeploymentResponse
+ var depRes ApiConfigurationResponse
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
Expect(err).Should(Succeed())
// verify response
- Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail}))
+ Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail}))
}
})
- It("should debounce requests", func(done Done) {
- var in = make(chan interface{})
- var out = make(chan []interface{})
-
- go testApiMan.debounce(in, out, 3*time.Millisecond)
-
- go func() {
- defer GinkgoRecover()
-
- received, ok := <-out
- Expect(ok).To(BeTrue())
- Expect(len(received)).To(Equal(2))
-
- close(in)
- received, ok = <-out
- Expect(ok).To(BeFalse())
-
- close(done)
- }()
-
- in <- "x"
- in <- "y"
- })
-
})
Context("GET /blobs", func() {
@@ -315,12 +403,12 @@
// setup http client
uri, err := url.Parse(apiTestUrl)
Expect(err).Should(Succeed())
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038"
+ uri.Path = configEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038"
//setup test data
dummyDbMan.err = nil
- dummyDbMan.configurations = make(map[string]*DataDeployment)
- expectedConfig := &DataDeployment{
+ dummyDbMan.configurations = make(map[string]*Configuration)
+ expectedConfig := &Configuration{
ID: "3ecd351c-1173-40bf-b830-c194e5ef9038",
OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8",
EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80",
@@ -343,7 +431,7 @@
Expect(res.StatusCode).Should(Equal(http.StatusOK))
// parse response
- var depRes ApiDeploymentDetails
+ var depRes ApiConfigurationDetails
body, err := ioutil.ReadAll(res.Body)
Expect(err).Should(Succeed())
err = json.Unmarshal(body, &depRes)
@@ -381,10 +469,10 @@
if data[1] != nil {
dummyDbMan.err = data[1].(error)
}
- dummyDbMan.configurations = make(map[string]*DataDeployment)
- dummyDbMan.configurations[data[0].(string)] = &DataDeployment{}
+ dummyDbMan.configurations = make(map[string]*Configuration)
+ dummyDbMan.configurations[data[0].(string)] = &Configuration{}
// http get
- uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string)
+ uri.Path = configEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string)
res, err := http.Get(uri.String())
Expect(err).Should(Succeed())
Expect(res.StatusCode).Should(Equal(expectedCode[i]))
@@ -395,12 +483,12 @@
})
-func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiDeploymentDetails {
+func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiConfigurationDetails {
mathrand.Seed(time.Now().UnixNano())
count := mathrand.Intn(5) + 1
- deployments := make([]DataDeployment, count)
- details := make([]ApiDeploymentDetails, count)
+ deployments := make([]Configuration, count)
+ details := make([]ApiConfigurationDetails, count)
for i := 0; i < count; i++ {
dep := makeTestDeployment()
@@ -415,13 +503,13 @@
return details
}
-func makeTestDeployment() *DataDeployment {
- dep := &DataDeployment{
+func makeTestDeployment() *Configuration {
+ dep := &Configuration{
ID: util.GenerateUUID(),
OrgID: util.GenerateUUID(),
EnvID: util.GenerateUUID(),
- BlobID: testBlobId,
- BlobResourceID: "",
+ BlobID: util.GenerateUUID(), //testBlobId,
+ BlobResourceID: util.GenerateUUID(), //"",
Type: "virtual-host",
Name: "vh-secure",
Revision: "1",
@@ -434,8 +522,8 @@
return dep
}
-func makeExpectedDetail(dep *DataDeployment, self string) *ApiDeploymentDetails {
- detail := &ApiDeploymentDetails{
+func makeExpectedDetail(dep *Configuration, self string) *ApiConfigurationDetails {
+ detail := &ApiConfigurationDetails{
Self: self + "/" + dep.ID,
Name: dep.Name,
Type: dep.Type,
@@ -443,61 +531,10 @@
BeanBlobUrl: getBlobUrl(dep.BlobID),
Org: dep.OrgID,
Env: dep.EnvID,
- ResourceBlobUrl: "",
+ ResourceBlobUrl: getBlobUrl(dep.BlobResourceID),
Path: dep.Path,
Created: dep.Created,
Updated: dep.Updated,
}
return detail
}
-
-type dummyDbManager struct {
- unreadyBlobIds []string
- readyDeployments []DataDeployment
- localFSLocation string
- fileResponse chan string
- version string
- configurations map[string]*DataDeployment
- err error
-}
-
-func (d *dummyDbManager) setDbVersion(version string) {
- d.version = version
-}
-
-func (d *dummyDbManager) initDb() error {
- return nil
-}
-
-func (d *dummyDbManager) getUnreadyBlobs() ([]string, error) {
- return d.unreadyBlobIds, nil
-}
-
-func (d *dummyDbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) {
- if typeFilter == "" {
- return d.readyDeployments, nil
- }
- return []DataDeployment{*(d.configurations[typeFilter])}, nil
-}
-
-func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
- file, err := os.Open(localFsLocation)
- if err != nil {
- return err
- }
- buff := make([]byte, 36)
- _, err = file.Read(buff)
- if err != nil {
- return err
- }
- d.fileResponse <- string(buff)
- return nil
-}
-
-func (d *dummyDbManager) getLocalFSLocation(string) (string, error) {
- return d.localFSLocation, nil
-}
-
-func (d *dummyDbManager) getConfigById(id string) (*DataDeployment, error) {
- return d.configurations[id], d.err
-}
diff --git a/apidGatewayConfDeploy-api.yaml b/apidGatewayConfDeploy-api.yaml
index cc1011f..c00d79a 100644
--- a/apidGatewayConfDeploy-api.yaml
+++ b/apidGatewayConfDeploy-api.yaml
@@ -48,30 +48,54 @@
in: "query"
type: string
description: "Long poll block duration in seconds"
- - name: "If-None-Match"
- in: "header"
+ - name: "apid-config-index"
+ in: "query"
type: string
- description: "ETag value from request in previous request"
+ description: "x-apid-config-index value from request in previous request"
+ - name: "type"
+ in: "query"
+ type: string
+ description: "filter configurations by type. When type filter is given, long-polling is not supported"
responses:
200:
description: Successful response
headers:
- ETag:
+ x-apid-config-index:
type: "string"
description: "client can use this for response caching"
schema:
$ref: '#/definitions/ConfigurationsResponse'
304:
description: Not Modified, No change in response based on If-None-Match header value. Cache representation.
- headers:
- ETag:
- type: "string"
- description: "client can use this for response caching"
default:
description: Error response
schema:
$ref: '#/definitions/ErrorResponse'
-
+
+ /configurations/{configId}:
+ get:
+ tags:
+ - "configurations/{configId}"
+ description: |
+ Get a configuration by id
+ parameters:
+ - name: configId
+ in: path
+ required: true
+ type: string
+ description: configId
+ responses:
+ 200:
+ description: Successful response
+ schema:
+ $ref: '#/definitions/Configuration'
+ 304:
+ description: Not Modified, No change in response based on If-None-Match header value. Cache representation.
+ default:
+ description: Error response
+ schema:
+ $ref: '#/definitions/ErrorResponse'
+
/blobs/{blobId}:
get:
tags:
@@ -123,9 +147,9 @@
contents:
type: array
items:
- $ref: '#/definitions/Configurations'
+ $ref: '#/definitions/Configuration'
- Configurations:
+ Configuration:
properties:
self:
type: string
diff --git a/apidGatewayConfDeploy_suite_test.go b/apidGatewayConfDeploy_suite_test.go
index 0c859f7..a85014a 100644
--- a/apidGatewayConfDeploy_suite_test.go
+++ b/apidGatewayConfDeploy_suite_test.go
@@ -50,7 +50,7 @@
config.Set(configApiServerBaseURI, "http://localhost")
config.Set(configDebounceDuration, "1ms")
config.Set(configDownloadQueueSize, 1)
- config.Set(configBundleCleanupDelay, time.Millisecond)
+ config.Set(configBlobCleanupDelay, time.Millisecond)
apid.InitializePlugins("0.0.0")
go apid.API().Listen()
time.Sleep(1 * time.Second)
diff --git a/bundle.go b/bundle.go
index b4729cf..c9d7d80 100644
--- a/bundle.go
+++ b/bundle.go
@@ -34,26 +34,23 @@
type bundleManagerInterface interface {
initializeBundleDownloading()
- queueDownloadRequest(*DataDeployment)
- enqueueRequest(*DownloadRequest)
- makeDownloadRequest(string) *DownloadRequest
- deleteBundlesFromDeployments([]DataDeployment)
- deleteBundleById(string)
+ downloadBlobsWithCallback(blobs []string, callback func())
+ deleteBlobs(blobIds []string)
Close()
}
type bundleManager struct {
- blobServerUrl string
- dbMan dbManagerInterface
- apiMan apiManagerInterface
- concurrentDownloads int
- markDeploymentFailedAfter time.Duration
- bundleRetryDelay time.Duration
- bundleCleanupDelay time.Duration
- downloadQueue chan *DownloadRequest
- isClosed *int32
- workers []*BundleDownloader
- client *http.Client
+ blobServerUrl string
+ dbMan dbManagerInterface
+ apiMan apiManagerInterface
+ concurrentDownloads int
+ markConfigFailedAfter time.Duration
+ bundleRetryDelay time.Duration
+ bundleCleanupDelay time.Duration
+ downloadQueue chan *DownloadRequest
+ isClosed *int32
+ workers []*BundleDownloader
+ client *http.Client
}
type blobServerResponse struct {
@@ -80,30 +77,22 @@
}
}
-// download bundle blob and resource blob
-// TODO do not download duplicate blobs
-func (bm *bundleManager) queueDownloadRequest(dep *DataDeployment) {
- blobReq := bm.makeDownloadRequest(dep.BlobID)
- resourceReq := bm.makeDownloadRequest(dep.BlobResourceID)
-
- go func() {
- bm.enqueueRequest(blobReq)
- bm.enqueueRequest(resourceReq)
- }()
-}
-
-func (bm *bundleManager) makeDownloadRequest(id string) *DownloadRequest {
- markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter)
+func (bm *bundleManager) makeDownloadRequest(blobId string, b *BunchDownloadRequest) *DownloadRequest {
+ if blobId == "" {
+ return nil
+ }
+ markFailedAt := time.Now().Add(bm.markConfigFailedAfter)
retryIn := bm.bundleRetryDelay
maxBackOff := 5 * time.Minute
return &DownloadRequest{
blobServerURL: bm.blobServerUrl,
bm: bm,
- blobId: id,
+ blobId: blobId,
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
client: bm.client,
+ bunchRequest: b,
}
}
@@ -112,14 +101,20 @@
if atomic.LoadInt32(bm.isClosed) == 1 {
return
}
- /*
- defer func() {
- if r := recover(); r != nil {
- log.Warn("trying to enque requests to closed bundleManager")
- }
- }()
- */
- bm.downloadQueue <- r
+ if r != nil {
+ bm.downloadQueue <- r
+ }
+}
+
+func (bm *bundleManager) downloadBlobsWithCallback(blobs []string, callback func()) {
+
+ c := &BunchDownloadRequest{
+ bm: bm,
+ blobs: blobs,
+ attemptCounter: new(int32),
+ callback: callback,
+ }
+ c.download()
}
func (bm *bundleManager) Close() {
@@ -127,32 +122,53 @@
close(bm.downloadQueue)
}
-func (bm *bundleManager) deleteBundlesFromDeployments(deletedDeployments []DataDeployment) {
- for _, dep := range deletedDeployments {
- go bm.deleteBundleById(dep.BlobID)
- go bm.deleteBundleById(dep.BlobResourceID)
+func (bm *bundleManager) deleteBlobs(blobs []string) {
+ for _, id := range blobs {
+ go bm.deleteBlobById(id)
}
-
- /*
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
- go func() {
- // give clients a minute to avoid conflicts
- time.Sleep(bm.bundleCleanupDelay)
- for _, dep := range deletedDeployments {
- bundleFile := getBlobFilePath(dep.BlobID)
- log.Debugf("removing old bundle: %v", bundleFile)
- // TODO Remove from the Database table apid_blob_available
- safeDelete(bundleFile)
- }
- }()
- */
}
// TODO add delete support
-func (bm *bundleManager) deleteBundleById(blobId string) {
+func (bm *bundleManager) deleteBlobById(blobId string) {
}
+type BunchDownloadRequest struct {
+ bm *bundleManager
+ blobs []string
+ attemptCounter *int32
+ callback func()
+}
+
+func (b *BunchDownloadRequest) download() {
+ //remove empty Ids
+ var ids []string
+ for _, id := range b.blobs {
+ if id != "" {
+ ids = append(ids, id)
+ }
+ }
+ b.blobs = ids
+ log.Debugf("Attempt to download blobs, len: %v", len(b.blobs))
+
+ if len(b.blobs) == 0 && b.callback != nil {
+ go b.callback()
+ return
+ }
+
+ *b.attemptCounter = int32(len(b.blobs))
+ for _, id := range b.blobs {
+ req := b.bm.makeDownloadRequest(id, b)
+ go b.bm.enqueueRequest(req)
+ }
+}
+
+func (b *BunchDownloadRequest) downloadAttempted() {
+ if atomic.AddInt32(b.attemptCounter, -1) == 0 && b.callback != nil {
+ go b.callback()
+ }
+}
+
type DownloadRequest struct {
bm *bundleManager
blobId string
@@ -160,12 +176,15 @@
markFailedAt time.Time
blobServerURL string
client *http.Client
+ bunchRequest *BunchDownloadRequest
+ attempted bool
}
-func (r *DownloadRequest) downloadBundle() error {
+func (r *DownloadRequest) downloadBlob() error {
log.Debugf("starting bundle download attempt for blobId=%s", r.blobId)
-
+ var err error
+ defer r.markAttempted(&err)
if r.checkTimeout() {
return &timeoutError{
markFailedAt: r.markFailedAt,
@@ -188,8 +207,6 @@
return err
}
- log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile)
-
err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile)
if err != nil {
log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId)
@@ -199,10 +216,7 @@
return err
}
- log.Debugf("bundle downloaded: blobId=%s filename=%s", r.blobId, downloadedFile)
-
- // TODO send changed deployments to subscribers (API call with "block")
- //r.bm.apiMan.addChangedDeployment(dep.ID)
+ log.Debugf("blod downloaded and inserted: blobId=%s filename=%s", r.blobId, downloadedFile)
return nil
}
@@ -218,6 +232,19 @@
return false
}
+func (r *DownloadRequest) markAttempted(errp *error) {
+ if !r.attempted {
+ r.attempted = true
+ err := *errp
+ if r.bunchRequest != nil {
+ r.bunchRequest.downloadAttempted()
+ }
+ if err != nil {
+ //TODO: insert to DB as "attempted but unsuccessful"
+ }
+ }
+}
+
func getBlobFilePath(blobId string) string {
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
}
@@ -263,7 +290,6 @@
func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) {
var tempFile *os.File
- log.Debugf("Downloading bundle: %s", blobId)
uri, err := getSignedURL(client, blobServerURL, blobId)
if err != nil {
@@ -282,18 +308,18 @@
var confReader io.ReadCloser
confReader, err = getUriReaderWithAuth(client, uri)
if err != nil {
- log.Errorf("Unable to retrieve bundle %s: %v", uri, err)
+ log.Errorf("Unable to retrieve Blob %s: %v", uri, err)
return
}
defer confReader.Close()
_, err = io.Copy(tempFile, confReader)
if err != nil {
- log.Errorf("Unable to write bundle %s: %v", tempFileName, err)
+ log.Errorf("Unable to write Blob %s: %v", tempFileName, err)
return
}
- log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName)
+ log.Debugf("Blob %s downloaded to: %s", uri, tempFileName)
return
}
@@ -328,7 +354,7 @@
for req := range w.bm.downloadQueue {
log.Debugf("starting download blobId=%s", req.blobId)
- err := req.downloadBundle()
+ err := req.downloadBlob()
if err != nil {
// timeout
if _, ok := err.(*timeoutError); ok {
diff --git a/bundle_test.go b/bundle_test.go
index 30760cc..c77114f 100644
--- a/bundle_test.go
+++ b/bundle_test.go
@@ -17,14 +17,11 @@
import (
"net/http"
- "bytes"
- "encoding/json"
"github.com/apid/apid-core/util"
- "github.com/gorilla/mux"
+
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
- "io"
- "strings"
+ mathrand "math/rand"
"sync/atomic"
"time"
)
@@ -64,19 +61,22 @@
}
// init dummy api manager
- dummyApiMan = &dummyApiManager{}
+ dummyApiMan = &dummyApiManager{
+ notifyChan: make(chan bool, 1),
+ initCalled: make(chan bool),
+ }
// init bundle manager
testBundleMan = &bundleManager{
- blobServerUrl: bundleTestUrl,
- dbMan: dummyDbMan,
- apiMan: dummyApiMan,
- concurrentDownloads: concurrentDownloads,
- markDeploymentFailedAfter: 5 * time.Second,
- bundleRetryDelay: time.Second,
- bundleCleanupDelay: 5 * time.Second,
- downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
- isClosed: new(int32),
+ blobServerUrl: bundleTestUrl,
+ dbMan: dummyDbMan,
+ apiMan: dummyApiMan,
+ concurrentDownloads: concurrentDownloads,
+ markConfigFailedAfter: 5 * time.Second,
+ bundleRetryDelay: time.Second,
+ bundleCleanupDelay: 5 * time.Second,
+ downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
+ isClosed: new(int32),
client: &http.Client{
Timeout: time.Second,
Transport: &http.Transport{
@@ -95,113 +95,124 @@
dummyApiMan = nil
})
- It("should download blob according to id", func() {
- // download blob
- id := util.GenerateUUID()
- testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id))
- received := <-dummyDbMan.fileResponse
- Expect(received).Should(Equal(id))
+ Context("download blobs", func() {
+
+ It("should download blob according to id", func() {
+ // download blob
+ id := util.GenerateUUID()
+ testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil))
+ received := <-dummyDbMan.fileResponse
+ Expect(received).Should(Equal(id))
+ })
+
+ It("should timeout connection and retry", func() {
+ // setup timeout
+ atomic.StoreInt32(blobServer.signedTimeout, 1)
+ atomic.StoreInt32(blobServer.blobTimeout, 1)
+ testBundleMan.client.Timeout = 500 * time.Millisecond
+ testBundleMan.bundleRetryDelay = 50 * time.Millisecond
+
+ // download blobs
+ id := util.GenerateUUID()
+ testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil))
+ received := <-dummyDbMan.fileResponse
+ Expect(received).Should(Equal(id))
+
+ }, 4)
+
+ It("should mark as failure according to markConfigFailedAfter", func() {
+ // setup timeout
+ atomic.StoreInt32(blobServer.signedTimeout, 1)
+ atomic.StoreInt32(blobServer.blobTimeout, 1)
+ testBundleMan.client.Timeout = 100 * time.Millisecond
+ testBundleMan.bundleRetryDelay = 100 * time.Millisecond
+ testBundleMan.markConfigFailedAfter = 200 * time.Millisecond
+
+ // download blobs
+ id := util.GenerateUUID()
+ req := testBundleMan.makeDownloadRequest(id, nil)
+ Expect(req.markFailedAt.After(time.Now())).Should(BeTrue())
+ testBundleMan.enqueueRequest(req)
+
+ // should fail
+ time.Sleep(time.Second)
+ Expect(req.markFailedAt.IsZero()).Should(BeTrue())
+ }, 4)
+
+ It("should call callback func after a round of download attempts", func() {
+ // download blobs
+ var ids []string
+ num := 1 + mathrand.Intn(5)
+ for i := 0; i < num; i++ {
+ ids = append(ids, util.GenerateUUID())
+ }
+ finishChan := make(chan int)
+ testBundleMan.downloadBlobsWithCallback(ids, func() {
+ finishChan <- 1
+ })
+ for i := 0; i < num; i++ {
+ <-dummyDbMan.fileResponse
+ }
+ <-finishChan
+ // if there's no blob
+ testBundleMan.downloadBlobsWithCallback(nil, func() {
+ finishChan <- 1
+ })
+ <-finishChan
+ }, 1)
})
- It("should timeout connection and retry", func() {
- // setup timeout
- atomic.StoreInt32(blobServer.signedTimeout, 1)
- atomic.StoreInt32(blobServer.blobTimeout, 1)
- testBundleMan.client.Timeout = 500 * time.Millisecond
- testBundleMan.bundleRetryDelay = 50 * time.Millisecond
+ Context("download blobs for changelist", func() {
+ It("should download blobs for changelist", func() {
+ //setup test data
+ count := mathrand.Intn(10) + 1
+ configs := make([]*Configuration, count)
+ for i := 0; i < count; i++ {
+ conf := makeTestDeployment()
+ conf.BlobID = util.GenerateUUID()
+ conf.BlobResourceID = util.GenerateUUID()
+ configs[i] = conf
+ }
- // download blobs
- id := util.GenerateUUID()
- testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id))
- received := <-dummyDbMan.fileResponse
- Expect(received).Should(Equal(id))
+ // should download blobs for changelist
+ testBundleMan.downloadBlobsWithCallback(extractBlobsToDownload(configs), dummyApiMan.notifyNewChange)
+ for i := 0; i < 2*count; i++ {
+ <-dummyDbMan.fileResponse
+ }
- }, 4)
+ // should notify after 1st download attempt
+ <-dummyApiMan.notifyChan
+ })
- It("should mark as failure according to markDeploymentFailedAfter", func() {
- // setup timeout
- atomic.StoreInt32(blobServer.signedTimeout, 1)
- atomic.StoreInt32(blobServer.blobTimeout, 1)
- testBundleMan.client.Timeout = 100 * time.Millisecond
- testBundleMan.bundleRetryDelay = 100 * time.Millisecond
- testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond
+ It("should notify after 1st download attempt unless failure", func() {
+ //setup test data
+ count := mathrand.Intn(10) + 1
+ configs := make([]*Configuration, count)
+ for i := 0; i < count; i++ {
+ conf := makeTestDeployment()
+ conf.BlobID = util.GenerateUUID()
+ conf.BlobResourceID = util.GenerateUUID()
+ configs[i] = conf
+ }
- // download blobs
- id := util.GenerateUUID()
- req := testBundleMan.makeDownloadRequest(id)
- Expect(req.markFailedAt.After(time.Now())).Should(BeTrue())
- testBundleMan.enqueueRequest(req)
+ // setup timeout
+ atomic.StoreInt32(blobServer.signedTimeout, 1)
+ atomic.StoreInt32(blobServer.blobTimeout, 1)
+ testBundleMan.client.Timeout = 500 * time.Millisecond
+ testBundleMan.bundleRetryDelay = 50 * time.Millisecond
- // should fail
- time.Sleep(time.Second)
- Expect(req.markFailedAt.IsZero()).Should(BeTrue())
- }, 4)
+ // should download blobs for changelist
+ testBundleMan.downloadBlobsWithCallback(extractBlobsToDownload(configs), dummyApiMan.notifyNewChange)
+
+ // should notify after 1st download attempt
+ <-dummyApiMan.notifyChan
+
+ //should retry download
+ for i := 0; i < 2*count; i++ {
+ <-dummyDbMan.fileResponse
+ }
+ })
+
+ })
+
})
-
-type dummyApiManager struct {
- initCalled bool
-}
-
-func (a *dummyApiManager) InitAPI() {
- a.initCalled = true
-}
-
-type dummyBlobServer struct {
- serverEndpoint string
- signedEndpoint string
- signedTimeout *int32
- blobTimeout *int32
- resetTimeout bool
-}
-
-func (b *dummyBlobServer) start() {
- services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET")
- services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET")
-}
-
-// send a dummy uri as response
-func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) {
- defer GinkgoRecover()
- if atomic.LoadInt32(b.signedTimeout) == int32(1) {
- if b.resetTimeout {
- atomic.StoreInt32(b.signedTimeout, 0)
- }
- time.Sleep(time.Second)
- }
- vars := mux.Vars(r)
- blobId := vars["blobId"]
-
- uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1)
- log.Debug("dummyBlobServer returnSigned: " + uriString)
-
- res := blobServerResponse{
- Id: blobId,
- Kind: "Blob",
- Self: r.RequestURI,
- SignedUrl: uriString,
- SignedUrlExpiryTimestamp: time.Now().Add(3 * time.Hour).Format(time.RFC3339),
- }
-
- resBytes, err := json.Marshal(res)
- Expect(err).Should(Succeed())
- _, err = io.Copy(w, bytes.NewReader(resBytes))
- Expect(err).Should(Succeed())
- w.Header().Set("Content-Type", headerSteam)
-}
-
-// send blobId back as response
-func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) {
- defer GinkgoRecover()
- if atomic.LoadInt32(b.blobTimeout) == int32(1) {
- if b.resetTimeout {
- atomic.StoreInt32(b.blobTimeout, 0)
- }
- time.Sleep(time.Second)
- }
- vars := mux.Vars(r)
- blobId := vars["blobId"]
- log.Debug("dummyBlobServer returnBlob id=" + blobId)
- _, err := io.Copy(w, bytes.NewReader([]byte(blobId)))
- Expect(err).Should(Succeed())
- w.Header().Set("Content-Type", headerSteam)
-}
diff --git a/cover.sh b/cover.sh
new file mode 100755
index 0000000..85bbc35
--- /dev/null
+++ b/cover.sh
@@ -0,0 +1,27 @@
+#!/bin/bash -eu
+#
+# Copyright 2017 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#!/usr/bin/env bash
+
+set -e
+echo "mode: atomic" > coverage.txt
+
+go test -coverprofile=profile.out -covermode=atomic github.com/apid/apidGatewayConfDeploy
+if [ -f profile.out ]; then
+ tail -n +2 profile.out >> coverage.txt
+ rm profile.out
+fi
+go tool cover -html=coverage.txt -o cover.html
diff --git a/data.go b/data.go
index d424e2b..e99cba7 100644
--- a/data.go
+++ b/data.go
@@ -21,11 +21,15 @@
"reflect"
)
+const (
+ InitLSN = "0.0.0"
+)
+
var (
gwBlobId int64
)
-type DataDeployment struct {
+type Configuration struct {
ID string
OrgID string
EnvID string
@@ -49,16 +53,21 @@
setDbVersion(string)
initDb() error
getUnreadyBlobs() ([]string, error)
- getReadyDeployments(typeFilter string) ([]DataDeployment, error)
+ getAllConfigurations(typeFilter string) ([]Configuration, error)
updateLocalFsLocation(string, string) error
getLocalFSLocation(string) (string, error)
- getConfigById(string) (*DataDeployment, error)
+ getConfigById(string) (*Configuration, error)
+ loadLsnFromDb() error
+ updateLSN(LSN string) error
+ getLSN() string
}
type dbManager struct {
- data apid.DataService
- db apid.DB
- dbMux sync.RWMutex
+ data apid.DataService
+ db apid.DB
+ dbMux sync.RWMutex
+ apidLSN string
+ lsnMutex sync.RWMutex
}
func (dbc *dbManager) setDbVersion(version string) {
@@ -84,7 +93,7 @@
}
defer tx.Rollback()
_, err = tx.Exec(`
- CREATE TABLE IF NOT EXISTS apid_blob_available (
+ CREATE TABLE IF NOT EXISTS APID_BLOB_AVAILABLE (
id text primary key,
local_fs_location text NOT NULL
);
@@ -92,15 +101,33 @@
if err != nil {
return err
}
- err = tx.Commit()
+ _, err = tx.Exec(`
+ CREATE TABLE IF NOT EXISTS APID_CONFIGURATION_LSN (
+ lsn text primary key
+ );
+ `)
if err != nil {
return err
}
- log.Debug("Database table apid_blob_available created.")
+
+ // insert a row if APID_CONFIGURATION_LSN is empty
+ _, err = tx.Exec(`
+ INSERT INTO APID_CONFIGURATION_LSN (lsn)
+ SELECT '0.0.0'
+ WHERE NOT EXISTS (SELECT * FROM APID_CONFIGURATION_LSN)
+ `)
+ if err != nil {
+ return err
+ }
+
+ if err = tx.Commit(); err != nil {
+ return err
+ }
+ log.Debug("Database table APID_BLOB_AVAILABLE, APID_CONFIGURATION_LSN created.")
return nil
}
-func (dbc *dbManager) getConfigById(id string) (config *DataDeployment, err error) {
+func (dbc *dbManager) getConfigById(id string) (config *Configuration, err error) {
row := dbc.getDb().QueryRow(`
SELECT a.id,
a.organization_id,
@@ -115,30 +142,29 @@
a.created_by,
a.updated_at,
a.updated_by
- FROM metadata_runtime_entity_metadata as a
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
WHERE a.id = ?;
`, id)
- config, err = dataDeploymentsFromRow(row)
+ config, err = configurationFromDbRow(row)
if err != nil {
return nil, err
}
return config, nil
}
-// getUnreadyDeployments() returns array of resources that are not yet to be processed
func (dbc *dbManager) getUnreadyBlobs() (ids []string, err error) {
rows, err := dbc.getDb().Query(`
SELECT id FROM (
SELECT a.bean_blob_id as id
- FROM metadata_runtime_entity_metadata as a
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
WHERE a.bean_blob_id NOT IN
- (SELECT b.id FROM apid_blob_available as b)
+ (SELECT b.id FROM APID_BLOB_AVAILABLE as b)
UNION
SELECT a.resource_blob_id as id
- FROM metadata_runtime_entity_metadata as a
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
WHERE a.resource_blob_id NOT IN
- (SELECT b.id FROM apid_blob_available as b)
+ (SELECT b.id FROM APID_BLOB_AVAILABLE as b)
)
WHERE id IS NOT NULL AND id != ''
;
@@ -161,7 +187,8 @@
return
}
-func (dbc *dbManager) getReadyDeployments(typeFilter string) ([]DataDeployment, error) {
+/*
+func (dbc *dbManager) getReadyConfigurations(typeFilter string) ([]Configuration, error) {
// An alternative statement is in get_ready_deployments.sql
// Need testing with large data volume to determine which is better
@@ -183,27 +210,27 @@
a.created_by,
a.updated_at,
a.updated_by
- FROM metadata_runtime_entity_metadata as a
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
WHERE a.id IN (
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.resource_blob_id = b.id
WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != ""
INTERSECT
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.bean_blob_id = b.id
WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != ""
UNION
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.bean_blob_id = b.id
WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = ""
)
@@ -224,28 +251,28 @@
a.created_by,
a.updated_at,
a.updated_by
- FROM metadata_runtime_entity_metadata as a
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
WHERE a.type = ?
AND a.id IN (
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.resource_blob_id = b.id
WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != ""
INTERSECT
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.bean_blob_id = b.id
WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != ""
UNION
SELECT
a.id
- FROM metadata_runtime_entity_metadata as a
- INNER JOIN apid_blob_available as b
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ INNER JOIN APID_BLOB_AVAILABLE as b
ON a.bean_blob_id = b.id
WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = ""
)
@@ -259,14 +286,72 @@
}
defer rows.Close()
- deployments, err := dataDeploymentsFromRows(rows)
+ confs, err := configurationsFromDbRows(rows)
if err != nil {
return nil, err
}
- log.Debugf("Configurations ready: %v", deployments)
+ //log.Debugf("Configurations ready: %v", confs)
- return deployments, nil
+ return confs, nil
+
+}
+*/
+func (dbc *dbManager) getAllConfigurations(typeFilter string) ([]Configuration, error) {
+
+ // An alternative statement is in get_ready_deployments.sql
+ // Need testing with large data volume to determine which is better
+
+ var rows *sql.Rows
+ var err error
+ if typeFilter == "" {
+ rows, err = dbc.getDb().Query(`
+ SELECT a.id,
+ a.organization_id,
+ a.environment_id,
+ a.bean_blob_id,
+ a.resource_blob_id,
+ a.type,
+ a.name,
+ a.revision,
+ a.path,
+ a.created_at,
+ a.created_by,
+ a.updated_at,
+ a.updated_by
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ ;`)
+ } else {
+ rows, err = dbc.getDb().Query(`
+ SELECT a.id,
+ a.organization_id,
+ a.environment_id,
+ a.bean_blob_id,
+ a.resource_blob_id,
+ a.type,
+ a.name,
+ a.revision,
+ a.path,
+ a.created_at,
+ a.created_by,
+ a.updated_at,
+ a.updated_by
+ FROM METADATA_RUNTIME_ENTITY_METADATA as a
+ WHERE a.type = ?
+ ;`, typeFilter)
+ }
+
+ if err != nil {
+ log.Errorf("DB Query for project_runtime_blob_metadata failed %v", err)
+ return nil, err
+ }
+ defer rows.Close()
+
+ confs, err := configurationsFromDbRows(rows)
+ if err != nil {
+ return nil, err
+ }
+ return confs, nil
}
@@ -277,21 +362,21 @@
}
defer txn.Rollback()
_, err = txn.Exec(`
- INSERT OR IGNORE INTO apid_blob_available (
+ INSERT OR IGNORE INTO APID_BLOB_AVAILABLE (
id,
local_fs_location
) VALUES (?, ?);`, blobId, localFsLocation)
if err != nil {
- log.Errorf("INSERT apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
+ log.Errorf("INSERT APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err)
return err
}
err = txn.Commit()
if err != nil {
- log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err)
+ log.Errorf("UPDATE APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err)
return err
}
- log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
+ log.Debugf("INSERT APID_BLOB_AVAILABLE {%s} local_fs_location {%s} succeeded", blobId, localFsLocation)
return nil
}
@@ -299,7 +384,7 @@
func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) {
log.Debugf("Getting the blob file for blobId {%s}", blobId)
- rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'")
+ rows, err := dbc.getDb().Query("SELECT local_fs_location FROM APID_BLOB_AVAILABLE WHERE id = '" + blobId + "'")
if err != nil {
log.Errorf("SELECT local_fs_location failed %v", err)
return "", err
@@ -317,12 +402,62 @@
return
}
-func dataDeploymentsFromRows(rows *sql.Rows) ([]DataDeployment, error) {
- tmp, err := structFromRows(reflect.TypeOf((*DataDeployment)(nil)).Elem(), rows)
+func (dbc *dbManager) loadLsnFromDb() error {
+ var LSN sql.NullString
+ ret := InitLSN
+
+ // If there's LSN for configuration
+ err := dbc.getDb().QueryRow("select lsn from APID_CONFIGURATION_LSN LIMIT 1").Scan(&LSN)
+ if err != nil && err != sql.ErrNoRows {
+ log.Errorf("Failed to select lsn from APID_CONFIGURATION_LSN: %v", err)
+ return err
+ }
+ if LSN.Valid {
+ ret = LSN.String
+ log.Debugf("LSN from APID_CONFIGURATION_LSN: %s", LSN.String)
+ }
+ dbc.lsnMutex.Lock()
+ defer dbc.lsnMutex.Unlock()
+ dbc.apidLSN = ret
+ return nil
+}
+
+func (dbc *dbManager) getLSN() string {
+ dbc.lsnMutex.RLock()
+ defer dbc.lsnMutex.RUnlock()
+ return dbc.apidLSN
+}
+
+func (dbc *dbManager) updateLSN(LSN string) (err error) {
+
+ tx, err := dbc.getDb().Begin()
+ if err != nil {
+ log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err)
+ return
+ }
+ defer tx.Rollback()
+ _, err = tx.Exec("UPDATE APID_CONFIGURATION_LSN SET lsn=?;", LSN)
+ if err != nil {
+ log.Errorf("UPDATE APID_CONFIGURATION_LSN Failed: %v", err)
+ return
+ }
+ log.Debugf("UPDATE APID_CONFIGURATION_LSN Success: %s", LSN)
+ if err = tx.Commit(); err != nil {
+ log.Errorf("Commit error in updateLSN: %v", err)
+ return
+ }
+ dbc.lsnMutex.Lock()
+ defer dbc.lsnMutex.Unlock()
+ dbc.apidLSN = LSN
+ return
+}
+
+func configurationsFromDbRows(rows *sql.Rows) ([]Configuration, error) {
+ tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows)
if err != nil {
return nil, err
}
- return tmp.([]DataDeployment), nil
+ return tmp.([]Configuration), nil
}
func structFromRows(t reflect.Type, rows *sql.Rows) (interface{}, error) {
@@ -349,15 +484,15 @@
return slice.Interface(), nil
}
-func dataDeploymentsFromRow(row *sql.Row) (*DataDeployment, error) {
- tmp, err := structFromRow(reflect.TypeOf((*DataDeployment)(nil)).Elem(), row)
+func configurationFromDbRow(row *sql.Row) (*Configuration, error) {
+ tmp, err := structFromRow(reflect.TypeOf((*Configuration)(nil)).Elem(), row)
if err != nil {
if err != sql.ErrNoRows {
- log.Errorf("Error in dataDeploymentsFromRow: %v", err)
+ log.Errorf("Error in configurationFromDbRow: %v", err)
}
return nil, err
}
- config := tmp.(DataDeployment)
+ config := tmp.(Configuration)
return &config, nil
}
diff --git a/data_test.go b/data_test.go
index fbbb0a7..65aace8 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,6 +15,8 @@
package apiGatewayConfDeploy
import (
+ "database/sql"
+ "fmt"
"github.com/apid/apid-core"
"github.com/apid/apid-core/data"
. "github.com/onsi/ginkgo"
@@ -36,6 +38,8 @@
"gcs:SHA-512:8fcc902465ccb32ceff25fa9f6fb28e3b314dbc2874c0f8add02f4e29c9e2798d344c51807aa1af56035cf09d39c800cf605d627ba65723f26d8b9c83c82d2f2": true,
"gcs:SHA-512:0c648779da035bfe0ac21f6268049aa0ae74d9d6411dadefaec33991e55c2d66c807e06f7ef84e0947f7c7d63b8c9e97cf0684cbef9e0a86b947d73c74ae7455": true,
}
+
+ allConfigs map[string]bool
)
var _ = Describe("data", func() {
@@ -44,13 +48,22 @@
var _ = BeforeEach(func() {
testCount += 1
testDbMan = &dbManager{
- data: services.Data(),
- dbMux: sync.RWMutex{},
+ data: services.Data(),
+ dbMux: sync.RWMutex{},
+ lsnMutex: sync.RWMutex{},
}
testDbMan.setDbVersion("test" + strconv.Itoa(testCount))
initTestDb(testDbMan.getDb())
err := testDbMan.initDb()
Expect(err).Should(Succeed())
+ allConfigs = map[string]bool{
+ "1dc4895e-6494-4b59-979f-5f4c89c073b4": true,
+ "319963ff-217e-4ecc-8d6e-c3665e962d1e": true,
+ "3af44bb7-0a74-4283-860c-3561e6c19132": true,
+ "d5ffd9db-4795-43eb-b645-d2a0b6c8ac6a": true,
+ "84ac8d68-b3d1-4bcc-ad0d-c6a0ed67e16c": true,
+ "3ecd351c-1173-40bf-b830-c194e5ef9038": true,
+ }
time.Sleep(100 * time.Millisecond)
})
@@ -59,7 +72,7 @@
data.Delete(data.VersionedDBID("common", "test"+strconv.Itoa(testCount)))
})
- Context("db tests", func() {
+ Context("basic db tests", func() {
It("initDb() should be idempotent", func() {
err := testDbMan.initDb()
Expect(err).Should(Succeed())
@@ -90,12 +103,79 @@
Expect(count).Should(Equal(6))
})
- It("should get empty slice if no deployments are ready", func() {
- deps, err := testDbMan.getReadyDeployments("")
+ It("should initialize support for long-polling", func() {
+ // APID_CONFIGURATION_LSN
+ rows, err := testDbMan.getDb().Query(`
+ SELECT lsn from APID_CONFIGURATION_LSN;
+ `)
Expect(err).Should(Succeed())
- Expect(len(deps)).Should(BeZero())
+ defer rows.Close()
+ count := 0
+ var lsn sql.NullString
+ for rows.Next() {
+ count++
+ rows.Scan(&lsn)
+ }
+ Expect(count).Should(Equal(1))
+ Expect(lsn.Valid).Should(BeTrue())
+ Expect(lsn.String).Should(Equal(InitLSN))
})
+ It("should maintain LSN", func() {
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ // write
+ err := testDbMan.updateLSN(testLSN)
+ Expect(err).Should(Succeed())
+ rows, err := testDbMan.getDb().Query(`
+ SELECT lsn from APID_CONFIGURATION_LSN;
+ `)
+ defer rows.Close()
+ count := 0
+ var lsn sql.NullString
+ for rows.Next() {
+ count++
+ rows.Scan(&lsn)
+ }
+ Expect(count).Should(Equal(1))
+ Expect(lsn.Valid).Should(BeTrue())
+ Expect(lsn.String).Should(Equal(testLSN))
+
+ // read
+ Expect(testDbMan.getLSN()).Should(Equal(testLSN))
+
+ //load
+ Expect(testDbMan.loadLsnFromDb()).Should(Succeed())
+ Expect(testDbMan.apidLSN).Should(Equal(testLSN))
+ })
+ })
+
+ Context("configuration tests", func() {
+
+ It("should get all configs", func() {
+ confs, err := testDbMan.getAllConfigurations("")
+ Expect(err).Should(Succeed())
+ Expect(len(confs)).Should(Equal(6))
+ for _, conf := range confs {
+ Expect(allConfigs[conf.ID]).Should(BeTrue())
+ allConfigs[conf.ID] = false
+ }
+ })
+
+ It("should get empty slice if no configurations", func() {
+ trancateTestMetadataTable(testDbMan.getDb())
+ confs, err := testDbMan.getAllConfigurations("")
+ Expect(err).Should(Succeed())
+ Expect(len(confs)).Should(BeZero())
+ })
+
+ /*
+ XIt("should get empty slice if no configurations are ready", func() {
+ confs, err := testDbMan.getReadyConfigurations("")
+ Expect(err).Should(Succeed())
+ Expect(len(confs)).Should(BeZero())
+ })
+ */
+
It("should succefully update local FS location", func() {
err := testDbMan.updateLocalFsLocation(testBlobId, testBlobLocalFsPrefix+testBlobId)
@@ -127,7 +207,7 @@
It("should get configuration by Id", func() {
config, err := testDbMan.getConfigById("3ecd351c-1173-40bf-b830-c194e5ef9038")
Expect(err).Should(Succeed())
- expectedResponse := &DataDeployment{
+ expectedResponse := &Configuration{
ID: "3ecd351c-1173-40bf-b830-c194e5ef9038",
OrgID: "73fcac6c-5d9f-44c1-8db0-333efda3e6e8",
EnvID: "ada76573-68e3-4f1a-a0f9-cbc201a97e80",
@@ -151,44 +231,43 @@
Expect(err).ShouldNot(Succeed())
})
- It("should successfully get all ready configurations", func() {
+ /*
+ XIt("should successfully get all ready configurations", func() {
- err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId)
- Expect(err).Should(Succeed())
- err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId)
- Expect(err).Should(Succeed())
+ err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId)
+ Expect(err).Should(Succeed())
+ err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId)
+ Expect(err).Should(Succeed())
- deps, err := testDbMan.getReadyDeployments("")
- Expect(err).Should(Succeed())
- Expect(len(deps)).Should(Equal(2))
- for _, dep := range deps {
- Expect(dep.BlobID).Should(Equal(readyBlobId))
- if dep.BlobResourceID != "" {
- Expect(dep.BlobResourceID).Should(Equal(readyResourceId))
+ confs, err := testDbMan.getReadyConfigurations("")
+ Expect(err).Should(Succeed())
+ Expect(len(confs)).Should(Equal(2))
+ for _, conf := range confs {
+ Expect(conf.BlobID).Should(Equal(readyBlobId))
+ if conf.BlobResourceID != "" {
+ Expect(conf.BlobResourceID).Should(Equal(readyResourceId))
+ }
}
- }
- })
-
- It("should get ready configurations by type filter", func() {
+ })
+ */
+ It("should get all configurations by type filter", func() {
err := testDbMan.updateLocalFsLocation(readyBlobId, testBlobLocalFsPrefix+readyBlobId)
Expect(err).Should(Succeed())
err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId)
Expect(err).Should(Succeed())
- deps, err := testDbMan.getReadyDeployments("ORGANIZATION")
+ confs, err := testDbMan.getAllConfigurations("ORGANIZATION")
Expect(err).Should(Succeed())
- Expect(len(deps)).Should(Equal(1))
- Expect(deps[0].ID).Should(Equal("319963ff-217e-4ecc-8d6e-c3665e962d1e"))
+ Expect(len(confs)).Should(Equal(2))
- deps, err = testDbMan.getReadyDeployments("ENVIRONMENT")
+ confs, err = testDbMan.getAllConfigurations("ENVIRONMENT")
Expect(err).Should(Succeed())
- Expect(len(deps)).Should(Equal(1))
- Expect(deps[0].ID).Should(Equal("1dc4895e-6494-4b59-979f-5f4c89c073b4"))
+ Expect(len(confs)).Should(Equal(4))
- deps, err = testDbMan.getReadyDeployments("INVALID-TYPE")
+ confs, err = testDbMan.getAllConfigurations("INVALID-TYPE")
Expect(err).Should(Succeed())
- Expect(len(deps)).Should(Equal(0))
+ Expect(len(confs)).Should(Equal(0))
})
It("should succefully get all unready blob ids", func() {
@@ -362,3 +441,14 @@
Expect(err).Should(Succeed())
Expect(tx.Commit()).Should(Succeed())
}
+
+func trancateTestMetadataTable(db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).Should(Succeed())
+ defer tx.Rollback()
+ _, err = tx.Exec(`
+ DELETE FROM metadata_runtime_entity_metadata;
+ `)
+ Expect(err).Should(Succeed())
+ Expect(tx.Commit()).Should(Succeed())
+}
diff --git a/init.go b/init.go
index 93e67ce..41ee499 100644
--- a/init.go
+++ b/init.go
@@ -26,24 +26,24 @@
)
const (
- configProtocol = "protocol_type"
- configAPIListen = "api_listen"
- configBundleBlobDownloadEndpoint = "gatewaydeploy_bundle_download_endpoint"
- configBundleDirKey = "gatewaydeploy_bundle_dir"
- configDebounceDuration = "gatewaydeploy_debounce_duration"
- configBundleCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
- configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout"
- configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout"
- configApiServerBaseURI = "apigeesync_proxy_server_base"
- configApidInstanceID = "apigeesync_apid_instance_id"
- configApidClusterID = "apigeesync_cluster_id"
- configConcurrentDownloads = "apigeesync_concurrent_downloads"
- configDownloadQueueSize = "apigeesync_download_queue_size"
- configBlobServerBaseURI = "apigeesync_blob_server_base"
- configStoragePath = "local_storage_path"
- maxIdleConnsPerHost = 50
- httpTimeout = time.Minute
- configBearerToken = "apigeesync_bearer_token"
+ configProtocol = "protocol_type"
+ configAPIListen = "api_listen"
+ configBlobDownloadEndpoint = "gatewaydeploy_bundle_download_endpoint"
+ configBlobDirKey = "gatewaydeploy_bundle_dir"
+ configDebounceDuration = "gatewaydeploy_debounce_duration"
+ configBlobCleanupDelay = "gatewaydeploy_bundle_cleanup_delay"
+ configMarkDeployFailedAfter = "gatewaydeploy_deployment_timeout"
+ configDownloadConnTimeout = "gatewaydeploy_download_connection_timeout"
+ configApiServerBaseURI = "apigeesync_proxy_server_base"
+ configApidInstanceID = "apigeesync_apid_instance_id"
+ configApidClusterID = "apigeesync_cluster_id"
+ configConcurrentDownloads = "apigeesync_concurrent_downloads"
+ configDownloadQueueSize = "apigeesync_download_queue_size"
+ configBlobServerBaseURI = "apigeesync_blob_server_base"
+ configStoragePath = "local_storage_path"
+ maxIdleConnsPerHost = 50
+ httpTimeout = time.Minute
+ configBearerToken = "apigeesync_bearer_token"
)
var (
@@ -81,9 +81,9 @@
return pluginData, fmt.Errorf("%s value %s parse err: %v", configApiServerBaseURI, apiServerBaseURI, err)
}
- config.SetDefault(configBundleDirKey, "bundles")
+ config.SetDefault(configBlobDirKey, "bundles")
config.SetDefault(configDebounceDuration, time.Second)
- config.SetDefault(configBundleCleanupDelay, time.Minute)
+ config.SetDefault(configBlobCleanupDelay, time.Minute)
config.SetDefault(configMarkDeployFailedAfter, 5*time.Minute)
config.SetDefault(configDownloadConnTimeout, 5*time.Minute)
config.SetDefault(configConcurrentDownloads, 15)
@@ -94,9 +94,9 @@
return pluginData, fmt.Errorf("%s must be a positive duration", configDebounceDuration)
}
- bundleCleanupDelay := config.GetDuration(configBundleCleanupDelay)
+ bundleCleanupDelay := config.GetDuration(configBlobCleanupDelay)
if bundleCleanupDelay < time.Millisecond {
- return pluginData, fmt.Errorf("%s must be a positive duration", configBundleCleanupDelay)
+ return pluginData, fmt.Errorf("%s must be a positive duration", configBlobCleanupDelay)
}
markDeploymentFailedAfter := config.GetDuration(configMarkDeployFailedAfter)
@@ -133,21 +133,19 @@
// initialize api manager
apiMan := &apiManager{
- dbMan: dbMan,
- deploymentsEndpoint: deploymentsEndpoint,
- blobEndpoint: blobEndpoint,
- deploymentIdEndpoint: deploymentIdEndpoint,
- eTag: 0,
- deploymentsChanged: make(chan interface{}, 5),
- addSubscriber: make(chan chan deploymentsResult),
- removeSubscriber: make(chan chan deploymentsResult),
- apiInitialized: false,
+ dbMan: dbMan,
+ configurationEndpoint: configEndpoint,
+ blobEndpoint: blobEndpoint,
+ configurationIdEndpoint: configIdEndpoint,
+ newChangeListChan: make(chan interface{}, 5),
+ addSubscriber: make(chan chan interface{}, 100),
+ apiInitialized: false,
}
// initialize bundle manager
blobServerURL := config.GetString(configBlobServerBaseURI)
- relativeBundlePath := config.GetString(configBundleDirKey)
+ relativeBundlePath := config.GetString(configBlobDirKey)
storagePath := config.GetString(configStoragePath)
bundlePath = path.Join(storagePath, relativeBundlePath)
if err := os.MkdirAll(bundlePath, 0700); err != nil {
@@ -157,23 +155,20 @@
concurrentDownloads := config.GetInt(configConcurrentDownloads)
downloadQueueSize := config.GetInt(configDownloadQueueSize)
bundleMan := &bundleManager{
- blobServerUrl: blobServerURL,
- dbMan: dbMan,
- apiMan: apiMan,
- concurrentDownloads: concurrentDownloads,
- markDeploymentFailedAfter: markDeploymentFailedAfter,
- bundleRetryDelay: time.Second,
- bundleCleanupDelay: bundleCleanupDelay,
- downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
- isClosed: new(int32),
- client: httpClient,
+ blobServerUrl: blobServerURL,
+ dbMan: dbMan,
+ apiMan: apiMan,
+ concurrentDownloads: concurrentDownloads,
+ markConfigFailedAfter: markDeploymentFailedAfter,
+ bundleRetryDelay: time.Second,
+ bundleCleanupDelay: bundleCleanupDelay,
+ downloadQueue: make(chan *DownloadRequest, downloadQueueSize),
+ isClosed: new(int32),
+ client: httpClient,
}
bundleMan.initializeBundleDownloading()
- //TODO initialize apiMan.distributeEvents() for api call with "block"
- //go apiMan.distributeEvents()
-
// initialize event handler
eventHandler = &apigeeSyncHandler{
dbMan: dbMan,
diff --git a/listener.go b/listener.go
index 2769568..b203187 100644
--- a/listener.go
+++ b/listener.go
@@ -36,13 +36,6 @@
}
}
-type bundleConfigJson struct {
- Name string `json:"name"`
- URI string `json:"uri"`
- ChecksumType string `json:"checksumType"`
- Checksum string `json:"checksum"`
-}
-
type apigeeSyncHandler struct {
dbMan dbManagerInterface
apiMan apiManagerInterface
@@ -70,109 +63,136 @@
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
h.dbMan.setDbVersion(snapshot.SnapshotInfo)
+ err := h.dbMan.initDb()
+ if err != nil {
+ log.Panicf("unable to init DB: %v", err)
+ }
+ if lsn := h.dbMan.getLSN(); lsn != "" {
+ // receive a new snapshot at runtime
+ if err = h.dbMan.updateLSN(lsn); err != nil {
+ log.Errorf("Unable to update LSN: %v", err)
+ }
+ } else { //apid just started
+ if err = h.dbMan.loadLsnFromDb(); err != nil {
+ log.Errorf("Unable to load LSN From Db: %v", err)
+ }
+ }
h.startupOnExistingDatabase()
- h.apiMan.InitAPI()
+ //h.apiMan.InitAPI()
log.Debug("Snapshot processed")
}
-// TODO make it work with new schema
func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
+
go func() {
// create apid_blob_available table
- h.dbMan.initDb()
+
blobIds, err := h.dbMan.getUnreadyBlobs()
if err != nil {
- log.Panicf("unable to query database for unready deployments: %v", err)
+ log.Panicf("unable to query database for unready configurations: %v", err)
}
log.Debugf("Queuing %d blob downloads", len(blobIds))
- for _, id := range blobIds {
- go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(id))
- }
+
+ // initialize API endpoints only after 1 round of download attempts is made
+ h.bundleMan.downloadBlobsWithCallback(blobIds, func() {
+ h.apiMan.InitAPI()
+ h.apiMan.notifyNewChange()
+ })
+
}()
}
func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) {
log.Debugf("Processing changes")
- // changes have been applied to DB
- var insertedDeployments, deletedDeployments []DataDeployment
- var updatedNewBlobs, updatedOldBlobs []string
+ // changes have been applied to DB by apidApigeeSync
+ var insertedConfigs, updatedNewConfigs, updatedOldConfigs, deletedConfigs []*Configuration
+ isConfigChanged := false
for _, change := range changes.Changes {
switch change.Table {
case CONFIG_METADATA_TABLE:
+ isConfigChanged = true
switch change.Operation {
case common.Insert:
- dep := dataDeploymentFromRow(change.NewRow)
- insertedDeployments = append(insertedDeployments, dep)
+ conf := configurationFromRow(change.NewRow)
+ insertedConfigs = append(insertedConfigs, &conf)
case common.Delete:
- dep := dataDeploymentFromRow(change.OldRow)
- deletedDeployments = append(deletedDeployments, dep)
+ conf := configurationFromRow(change.OldRow)
+ deletedConfigs = append(deletedConfigs, &conf)
case common.Update:
- depNew := dataDeploymentFromRow(change.NewRow)
- depOld := dataDeploymentFromRow(change.OldRow)
-
- if depOld.BlobID != depNew.BlobID {
- updatedNewBlobs = append(updatedNewBlobs, depNew.BlobID)
- updatedOldBlobs = append(updatedOldBlobs, depOld.BlobID)
- }
-
- if depOld.BlobResourceID != depNew.BlobResourceID {
- updatedNewBlobs = append(updatedNewBlobs, depNew.BlobResourceID)
- updatedOldBlobs = append(updatedOldBlobs, depOld.BlobResourceID)
- }
+ confNew := configurationFromRow(change.NewRow)
+ confOld := configurationFromRow(change.OldRow)
+ updatedNewConfigs = append(updatedNewConfigs, &confNew)
+ updatedOldConfigs = append(updatedOldConfigs, &confOld)
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
}
}
-
- /*
- for _, d := range deletedDeployments {
- h.apiMan.addChangedDeployment(d.ID)
- }
- */
-
- // insert
- for i := range insertedDeployments {
- go h.bundleMan.queueDownloadRequest(&insertedDeployments[i])
+ // delete old configs from FS
+ if len(deletedConfigs)+len(updatedOldConfigs) > 0 {
+ log.Debugf("will delete %d old blobs", len(deletedConfigs)+len(updatedOldConfigs))
+ //TODO delete blobs for deleted configs
+ blobIds := extractBlobsToDelete(append(deletedConfigs, updatedOldConfigs...))
+ go h.bundleMan.deleteBlobs(blobIds)
}
- // update
- for i := range updatedNewBlobs {
- go h.bundleMan.enqueueRequest(h.bundleMan.makeDownloadRequest(updatedNewBlobs[i]))
+ // download and expose new configs
+ if isConfigChanged {
+ h.dbMan.updateLSN(changes.LastSequence)
+ blobs := extractBlobsToDownload(append(insertedConfigs, updatedNewConfigs...))
+ h.bundleMan.downloadBlobsWithCallback(blobs, h.apiMan.notifyNewChange)
+ } else if h.dbMan.getLSN() == InitLSN {
+ h.dbMan.updateLSN(changes.LastSequence)
}
- for i := range updatedOldBlobs {
- go h.bundleMan.deleteBundleById(updatedOldBlobs[i])
- }
-
- // delete
- if len(deletedDeployments) > 0 {
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
- //TODO delete bundles for deleted deployments
- h.bundleMan.deleteBundlesFromDeployments(deletedDeployments)
- }
}
-func dataDeploymentFromRow(row common.Row) (d DataDeployment) {
+func extractBlobsToDownload(confs []*Configuration) (blobs []string) {
+ //TODO: do not include already-downloaded blobs
+ for _, conf := range confs {
+ if conf.BlobID != "" {
+ blobs = append(blobs, conf.BlobID)
+ }
+ if conf.BlobResourceID != "" {
+ blobs = append(blobs, conf.BlobResourceID)
+ }
+ }
+ return
+}
- row.Get("id", &d.ID)
- row.Get("organization_id", &d.OrgID)
- row.Get("environment_id", &d.EnvID)
- row.Get("bean_blob_id", &d.BlobID)
- row.Get("resource_blob_id", &d.BlobResourceID)
- row.Get("type", &d.Type)
- row.Get("name", &d.Name)
- row.Get("revision", &d.Revision)
- row.Get("path", &d.Path)
- row.Get("created_at", &d.Created)
- row.Get("created_by", &d.CreatedBy)
- row.Get("updated_at", &d.Updated)
- row.Get("updated_by", &d.UpdatedBy)
+func extractBlobsToDelete(confs []*Configuration) (blobs []string) {
+ //TODO: do not include already-downloaded blobs
+ for _, conf := range confs {
+ if conf.BlobID != "" {
+ blobs = append(blobs, conf.BlobID)
+ }
+ if conf.BlobResourceID != "" {
+ blobs = append(blobs, conf.BlobResourceID)
+ }
+ }
+ return
+}
+
+func configurationFromRow(row common.Row) (c Configuration) {
+
+ row.Get("id", &c.ID)
+ row.Get("organization_id", &c.OrgID)
+ row.Get("environment_id", &c.EnvID)
+ row.Get("bean_blob_id", &c.BlobID)
+ row.Get("resource_blob_id", &c.BlobResourceID)
+ row.Get("type", &c.Type)
+ row.Get("name", &c.Name)
+ row.Get("revision", &c.Revision)
+ row.Get("path", &c.Path)
+ row.Get("created_at", &c.Created)
+ row.Get("created_by", &c.CreatedBy)
+ row.Get("updated_at", &c.Updated)
+ row.Get("updated_by", &c.UpdatedBy)
return
}
diff --git a/listener_test.go b/listener_test.go
index 55a71c5..2971a08 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -22,7 +22,6 @@
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"math/rand"
- "reflect"
"time"
)
@@ -31,17 +30,19 @@
var dummyApiMan *dummyApiManager
var dummyBundleMan *dummyBundleManager
var testHandler *apigeeSyncHandler
+ var testCount int
var _ = BeforeEach(func() {
+ testCount += 1
// stop handler created by initPlugin()
eventHandler.stopListener(services)
- dummyApiMan = &dummyApiManager{}
+ dummyApiMan = &dummyApiManager{
+ notifyChan: make(chan bool, 1),
+ initCalled: make(chan bool),
+ }
dummyDbMan = &dummyDbManager{}
dummyBundleMan = &dummyBundleManager{
- requestChan: make(chan *DownloadRequest),
- depChan: make(chan *DataDeployment),
- delChan: make(chan *DataDeployment),
- delBlobChan: make(chan string),
+ blobChan: make(chan string),
}
testHandler = &apigeeSyncHandler{
dbMan: dummyDbMan,
@@ -73,11 +74,11 @@
SnapshotInfo: fmt.Sprint(rand.Uint32()),
}
- apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
for i := 0; i < len(unreadyBlobIds); i++ {
- req := <-dummyBundleMan.requestChan
- blobMap[req.blobId]++
+ id := <-dummyBundleMan.blobChan
+ blobMap[id]++
}
// verify all unready blobids are enqueued
@@ -86,7 +87,7 @@
}
})
- It("Snapshot events should set db version, and should only init API endpoint once", func() {
+ It("Snapshot events should set db version", func() {
// emit snapshot
for i := 0; i < 2+rand.Intn(5); i++ {
@@ -97,11 +98,42 @@
<-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
Expect(dummyDbMan.version).Should(Equal(version))
}
-
- // verify init API called
- Expect(dummyApiMan.initCalled).Should(BeTrue())
})
+ It("Snapshot event should init API endpoint and notify long-polling", func() {
+
+ // emit snapshot
+ version := fmt.Sprint(rand.Uint32())
+ snapshot := &common.Snapshot{
+ SnapshotInfo: version,
+ }
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ Expect(dummyDbMan.version).Should(Equal(version))
+ Expect(<-dummyApiMan.initCalled).Should(BeTrue())
+ Expect(<-dummyApiMan.notifyChan).Should(BeTrue())
+ })
+
+ It("Should load LSN when apid starts", func() {
+ dummyDbMan.dbLSN = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ // emit snapshot
+ version := fmt.Sprint(rand.Uint32())
+ snapshot := &common.Snapshot{
+ SnapshotInfo: version,
+ }
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN))
+ })
+
+ It("Should store LSN when receiving snapshot at runtime", func() {
+ dummyDbMan.lsn = fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ // emit snapshot
+ version := fmt.Sprint(rand.Uint32())
+ snapshot := &common.Snapshot{
+ SnapshotInfo: version,
+ }
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot)
+ Expect(dummyDbMan.getLSN()).Should(Equal(dummyDbMan.dbLSN))
+ })
})
Context("Change list", func() {
@@ -109,7 +141,7 @@
It("Insert event should enqueue download requests for all inserted deployments", func() {
// emit change event
changes := make([]common.Change, 0)
- deployments := make(map[string]DataDeployment)
+ blobs := make(map[string]int)
for i := 0; i < 1+rand.Intn(10); i++ {
dep := makeTestDeployment()
change := common.Change{
@@ -118,25 +150,25 @@
NewRow: rowFromDeployment(dep),
}
changes = append(changes, change)
- deployments[dep.ID] = *dep
+ blobs[dep.BlobID]++
+ blobs[dep.BlobResourceID]++
}
changeList := &common.ChangeList{
Changes: changes,
}
- apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
- for i := 0; i < len(changes); i++ {
- dep := <-dummyBundleMan.depChan
- Expect(reflect.DeepEqual(deployments[dep.ID], *dep)).Should(BeTrue())
- delete(deployments, dep.ID)
+ for i := 0; i < 2*len(changes); i++ {
+ blobId := <-dummyBundleMan.blobChan
+ blobs[blobId]++
+ Expect(blobs[blobId]).Should(Equal(2))
}
- Expect(len(deployments)).Should(BeZero())
})
- It("Delete event should deliver to the bundle manager", func() {
+ XIt("Delete event should deliver to the bundle manager", func() {
// emit change event
changes := make([]common.Change, 0)
deployments := make(map[string]bool)
@@ -155,169 +187,137 @@
Changes: changes,
}
- apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
for i := 0; i < len(changes); i++ {
- dep := <-dummyBundleMan.delChan
- Expect(deployments[dep.ID]).Should(BeTrue())
- delete(deployments, dep.ID)
}
Expect(len(deployments)).Should(BeZero())
})
- It("Update event should enqueue download requests and delete old blobs", func() {
+ It("Update event should enqueue download requests", func() {
changes := make([]common.Change, 0)
- blobIdNew := make(map[string]int)
- blobIdOld := make(map[string]int)
+ blobsNew := make(map[string]int)
for i := 0; i < 1+rand.Intn(10); i++ {
- depNew := makeTestDeployment()
- depNew.BlobID = util.GenerateUUID()
- depNew.BlobResourceID = util.GenerateUUID()
+ confNew := makeTestDeployment()
+ confNew.BlobID = util.GenerateUUID()
+ confNew.BlobResourceID = util.GenerateUUID()
- depOld := makeTestDeployment()
- depOld.BlobID = util.GenerateUUID()
- depOld.BlobResourceID = util.GenerateUUID()
+ confOld := makeTestDeployment()
+ confOld.BlobID = util.GenerateUUID()
+ confOld.BlobResourceID = util.GenerateUUID()
change := common.Change{
Operation: common.Update,
Table: CONFIG_METADATA_TABLE,
- NewRow: rowFromDeployment(depNew),
- OldRow: rowFromDeployment(depOld),
+ NewRow: rowFromDeployment(confNew),
+ OldRow: rowFromDeployment(confOld),
}
changes = append(changes, change)
- blobIdNew[depNew.BlobID]++
- blobIdNew[depNew.BlobResourceID]++
- blobIdOld[depOld.BlobID]++
- blobIdOld[depOld.BlobResourceID]++
+ blobsNew[confNew.BlobID]++
+ blobsNew[confNew.BlobResourceID]++
}
+ testLSN := "1.1.1"
// emit change event
changeList := &common.ChangeList{
- Changes: changes,
+ Changes: changes,
+ LastSequence: testLSN,
}
- apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
// verify
- for i := 0; i < len(blobIdNew); i++ {
- req := <-dummyBundleMan.requestChan
- blobIdNew[req.blobId]++
- Expect(blobIdNew[req.blobId]).Should(Equal(2))
- }
- for i := 0; i < len(blobIdOld); i++ {
- blobId := <-dummyBundleMan.delBlobChan
- blobIdOld[blobId]++
- Expect(blobIdOld[blobId]).Should(Equal(2))
+ for i := 0; i < 2*len(changes); i++ {
+ blobId := <-dummyBundleMan.blobChan
+ blobsNew[blobId]++
+ Expect(blobsNew[blobId]).Should(Equal(2))
}
})
+ })
- It("Update event should only download/delete changed blobs", func() {
+ Context("LSN", func() {
+
+ var _ = BeforeEach(func() {
+ dummyDbMan.lsn = "0.0.1"
+ })
+
+ It("changelist with CONFIG_METADATA_TABLE should update apidLSN", func() {
+ // emit change event
changes := make([]common.Change, 0)
- blobIdChangedNew := make(map[string]int)
- blobIdChangedOld := make(map[string]int)
-
+ deployments := make(map[string]Configuration)
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
for i := 0; i < 1+rand.Intn(10); i++ {
- depNew := makeTestDeployment()
- depNew.BlobID = util.GenerateUUID()
- depNew.BlobResourceID = util.GenerateUUID()
-
- depOld := makeTestDeployment()
-
- if rand.Intn(2) == 0 {
- // blob id changed
- depOld.BlobID = util.GenerateUUID()
- blobIdChangedNew[depNew.BlobID]++
- blobIdChangedOld[depOld.BlobID]++
- } else {
- // blob id unchanged
- depOld.BlobID = depNew.BlobID
- }
-
- if rand.Intn(2) == 0 {
- // blob id changed
- depOld.BlobResourceID = util.GenerateUUID()
- blobIdChangedNew[depNew.BlobResourceID]++
- blobIdChangedOld[depOld.BlobResourceID]++
- } else {
- // blob id unchanged
- depOld.BlobResourceID = depNew.BlobResourceID
- }
-
+ dep := makeTestDeployment()
change := common.Change{
- Operation: common.Update,
+ Operation: common.Insert,
Table: CONFIG_METADATA_TABLE,
- NewRow: rowFromDeployment(depNew),
- OldRow: rowFromDeployment(depOld),
+ NewRow: rowFromDeployment(dep),
}
changes = append(changes, change)
+ deployments[dep.ID] = *dep
}
+ changeList := &common.ChangeList{
+ Changes: changes,
+ LastSequence: testLSN,
+ }
+
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ for i := 0; i < 2*len(changes); i++ {
+ <-dummyBundleMan.blobChan
+ }
+ Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
+
+ })
+
+ It("changelist without CONFIG_METADATA_TABLE shouldn't update apidLSN", func() {
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ dummyDbMan.lsn = testLSN
+ // emit change event
+ changes := make([]common.Change, 0)
+ deployments := make(map[string]Configuration)
+ for i := 0; i < 1+rand.Intn(10); i++ {
+ dep := makeTestDeployment()
+ change := common.Change{
+ Operation: common.Insert,
+ Table: "somewhat-table",
+ NewRow: rowFromDeployment(dep),
+ }
+ changes = append(changes, change)
+ deployments[dep.ID] = *dep
+ }
+
+ changeList := &common.ChangeList{
+ Changes: changes,
+ LastSequence: "aaa.aaa.aaa",
+ }
+
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
+
+ })
+
+ It("changelist should always update apidLSN if it has init value", func() {
+ testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount)
+ dummyDbMan.lsn = InitLSN
// emit change event
changeList := &common.ChangeList{
- Changes: changes,
+ Changes: nil,
+ LastSequence: testLSN,
}
- apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList)
+ Expect(dummyDbMan.getLSN()).Should(Equal(testLSN))
- // verify
- for i := 0; i < len(blobIdChangedNew); i++ {
- req := <-dummyBundleMan.requestChan
- blobIdChangedNew[req.blobId]++
- Expect(blobIdChangedNew[req.blobId]).Should(Equal(2))
- }
- for i := 0; i < len(blobIdChangedOld); i++ {
- blobId := <-dummyBundleMan.delBlobChan
- blobIdChangedOld[blobId]++
- Expect(blobIdChangedOld[blobId]).Should(Equal(2))
- }
})
})
})
-type dummyBundleManager struct {
- requestChan chan *DownloadRequest
- depChan chan *DataDeployment
- delChan chan *DataDeployment
- delBlobChan chan string
-}
-
-func (bm *dummyBundleManager) initializeBundleDownloading() {
-
-}
-
-func (bm *dummyBundleManager) queueDownloadRequest(dep *DataDeployment) {
- bm.depChan <- dep
-}
-
-func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
- bm.requestChan <- req
-}
-
-func (bm *dummyBundleManager) makeDownloadRequest(blobId string) *DownloadRequest {
- return &DownloadRequest{
- blobId: blobId,
- }
-}
-
-func (bm *dummyBundleManager) deleteBundlesFromDeployments(deployments []DataDeployment) {
- for i := range deployments {
- bm.delChan <- &deployments[i]
- }
-}
-
-func (bm *dummyBundleManager) deleteBundleById(blobId string) {
- bm.delBlobChan <- blobId
-}
-
-func (bm *dummyBundleManager) Close() {
-
-}
-
-func rowFromDeployment(dep *DataDeployment) common.Row {
+func rowFromDeployment(dep *Configuration) common.Row {
row := common.Row{}
row["id"] = &common.ColumnVal{Value: dep.ID}
row["organization_id"] = &common.ColumnVal{Value: dep.OrgID}
diff --git a/mock_test.go b/mock_test.go
new file mode 100644
index 0000000..f8f804a
--- /dev/null
+++ b/mock_test.go
@@ -0,0 +1,192 @@
+package apiGatewayConfDeploy
+
+import (
+ "bytes"
+ "encoding/json"
+ "github.com/gorilla/mux"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "io"
+ "net/http"
+ "os"
+ "strings"
+ "sync/atomic"
+ "time"
+)
+
+type dummyDbManager struct {
+ unreadyBlobIds []string
+ readyDeployments []Configuration
+ localFSLocation string
+ fileResponse chan string
+ version string
+ configurations map[string]*Configuration
+ lsn string
+ dbLSN string
+ err error
+}
+
+func (d *dummyDbManager) setDbVersion(version string) {
+ d.version = version
+}
+
+func (d *dummyDbManager) initDb() error {
+ return nil
+}
+
+func (d *dummyDbManager) getUnreadyBlobs() ([]string, error) {
+ return d.unreadyBlobIds, nil
+}
+
+func (d *dummyDbManager) getAllConfigurations(typeFilter string) ([]Configuration, error) {
+ if typeFilter == "" {
+ return d.readyDeployments, nil
+ }
+ return []Configuration{*(d.configurations[typeFilter])}, nil
+}
+
+func (d *dummyDbManager) updateLocalFsLocation(blobId, localFsLocation string) error {
+ file, err := os.Open(localFsLocation)
+ if err != nil {
+ return err
+ }
+ buff := make([]byte, 36)
+ _, err = file.Read(buff)
+ if err != nil {
+ return err
+ }
+ go func(buff []byte) {
+ d.fileResponse <- string(buff)
+ }(buff)
+
+ return nil
+}
+
+func (d *dummyDbManager) getLocalFSLocation(string) (string, error) {
+ return d.localFSLocation, nil
+}
+
+func (d *dummyDbManager) getConfigById(id string) (*Configuration, error) {
+ return d.configurations[id], d.err
+}
+func (d *dummyDbManager) getLSN() string {
+ return d.lsn
+}
+
+func (d *dummyDbManager) updateLSN(LSN string) error {
+ d.lsn = LSN
+ d.dbLSN = LSN
+ return nil
+}
+
+func (d *dummyDbManager) loadLsnFromDb() error {
+ d.lsn = d.dbLSN
+ return nil
+}
+
+type dummyApiManager struct {
+ initCalled chan bool
+ notifyChan chan bool
+}
+
+func (a *dummyApiManager) InitAPI() {
+ go func() {
+ a.initCalled <- true
+ }()
+}
+
+func (a *dummyApiManager) notifyNewChange() {
+ a.notifyChan <- true
+}
+
+type dummyBundleManager struct {
+ blobChan chan string
+}
+
+func (bm *dummyBundleManager) initializeBundleDownloading() {
+
+}
+
+func (bm *dummyBundleManager) downloadBlobsWithCallback(blobs []string, callback func()) {
+ go func() {
+ for _, id := range blobs {
+ bm.blobChan <- id
+ }
+ }()
+ go callback()
+}
+
+func (bm *dummyBundleManager) makeDownloadRequest(blobId string, bunchRequest *BunchDownloadRequest) *DownloadRequest {
+ return &DownloadRequest{
+ blobId: blobId,
+ bunchRequest: bunchRequest,
+ }
+}
+
+func (bm *dummyBundleManager) deleteBlobs(blobIds []string) {
+
+}
+
+func (bm *dummyBundleManager) Close() {
+
+}
+
+type dummyBlobServer struct {
+ serverEndpoint string
+ signedEndpoint string
+ signedTimeout *int32
+ blobTimeout *int32
+ resetTimeout bool
+}
+
+func (b *dummyBlobServer) start() {
+ services.API().HandleFunc(b.serverEndpoint, b.returnSigned).Methods("GET")
+ services.API().HandleFunc(b.signedEndpoint, b.returnBlob).Methods("GET")
+}
+
+// send a dummy uri as response
+func (b *dummyBlobServer) returnSigned(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+ if atomic.LoadInt32(b.signedTimeout) == int32(1) {
+ if b.resetTimeout {
+ atomic.StoreInt32(b.signedTimeout, 0)
+ }
+ time.Sleep(time.Second)
+ }
+ vars := mux.Vars(r)
+ blobId := vars["blobId"]
+
+ uriString := strings.Replace(bundleTestUrl+b.signedEndpoint, "{blobId}", blobId, 1)
+ log.Debug("dummyBlobServer returnSigned: " + uriString)
+
+ res := blobServerResponse{
+ Id: blobId,
+ Kind: "Blob",
+ Self: r.RequestURI,
+ SignedUrl: uriString,
+ SignedUrlExpiryTimestamp: time.Now().Add(3 * time.Hour).Format(time.RFC3339),
+ }
+
+ resBytes, err := json.Marshal(res)
+ Expect(err).Should(Succeed())
+ _, err = io.Copy(w, bytes.NewReader(resBytes))
+ Expect(err).Should(Succeed())
+ w.Header().Set("Content-Type", headerSteam)
+}
+
+// send blobId back as response
+func (b *dummyBlobServer) returnBlob(w http.ResponseWriter, r *http.Request) {
+ defer GinkgoRecover()
+ if atomic.LoadInt32(b.blobTimeout) == int32(1) {
+ if b.resetTimeout {
+ atomic.StoreInt32(b.blobTimeout, 0)
+ }
+ time.Sleep(time.Second)
+ }
+ vars := mux.Vars(r)
+ blobId := vars["blobId"]
+ log.Debug("dummyBlobServer returnBlob id=" + blobId)
+ _, err := io.Copy(w, bytes.NewReader([]byte(blobId)))
+ Expect(err).Should(Succeed())
+ w.Header().Set("Content-Type", headerSteam)
+}