[ISSUE-66918282] use long-poll in apid-core, improve style
diff --git a/api.go b/api.go index 3e918f5..9f18229 100644 --- a/api.go +++ b/api.go
@@ -19,6 +19,7 @@ "encoding/json" "errors" "fmt" + "github.com/apid/apid-core/util" "github.com/apigee-labs/transicator/common" "github.com/gorilla/mux" "io" @@ -40,10 +41,10 @@ ) const ( - deploymentsEndpoint = "/configurations" - blobEndpointPath = "/blobs" - blobEndpoint = blobEndpointPath + "/{blobId}" - deploymentIdEndpoint = deploymentsEndpoint + "/{configId}" + configEndpoint = "/configurations" + blobEndpointPath = "/blobs" + blobEndpoint = blobEndpointPath + "/{blobId}" + configIdEndpoint = configEndpoint + "/{configId}" ) const ( @@ -87,7 +88,7 @@ Reason string `json:"reason"` } -type ApiDeploymentDetails struct { +type ApiConfigurationDetails struct { Self string `json:"self"` Name string `json:"name"` Type string `json:"type"` @@ -101,10 +102,10 @@ Updated string `json:"updated"` } -type ApiDeploymentResponse struct { - Kind string `json:"kind"` - Self string `json:"self"` - ApiDeploymentsResponse []ApiDeploymentDetails `json:"contents"` +type ApiConfigurationResponse struct { + Kind string `json:"kind"` + Self string `json:"self"` + ApiConfigurationsResponse []ApiConfigurationDetails `json:"contents"` } type confChangeNotification struct { @@ -119,35 +120,35 @@ } type apiManager struct { - dbMan dbManagerInterface - deploymentsEndpoint string - blobEndpoint string - deploymentIdEndpoint string - addSubscriber chan chan interface{} - newChangeListChan chan interface{} - apiInitialized bool + dbMan dbManagerInterface + configurationEndpoint string + blobEndpoint string + configurationIdEndpoint string + addSubscriber chan chan interface{} + newChangeListChan chan interface{} + apiInitialized bool } func (a *apiManager) InitAPI() { if a.apiInitialized { return } - services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentConfigs).Methods("GET") + services.API().HandleFunc(a.configurationEndpoint, a.apiGetCurrentConfigs).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") - services.API().HandleFunc(a.deploymentIdEndpoint, a.apiHandleConfigId).Methods("GET") + services.API().HandleFunc(a.configurationIdEndpoint, a.apiHandleConfigId).Methods("GET") a.initDistributeEvents() a.apiInitialized = true log.Debug("API endpoints initialized") } func (a *apiManager) initDistributeEvents() { - go distributeEvents(a.newChangeListChan, a.addSubscriber) + go util.DistributeEvents(a.newChangeListChan, a.addSubscriber) } func (a *apiManager) notifyNewChangeList(newLSN string) { - confs, err := a.dbMan.getReadyDeployments("") + confs, err := a.dbMan.getReadyConfigurations("") if err != nil { - log.Errorf("Database error in getReadyDeployments: %v", err) + log.Errorf("Database error in getReadyConfigurations: %v", err) } a.newChangeListChan <- &confChangeNotification{ LSN: newLSN, @@ -210,8 +211,8 @@ } return } - configDetail := ApiDeploymentDetails{ - Self: getHttpHost() + a.deploymentsEndpoint + "/" + config.ID, + configDetail := ApiConfigurationDetails{ + Self: getHttpHost() + a.configurationEndpoint + "/" + config.ID, Name: config.Name, Type: config.Type, Revision: config.Revision, @@ -231,17 +232,16 @@ return } log.Debugf("sending configuration %s", b) + w.Header().Set("Content-Type", headerJson) w.Write(b) } +// If not long-polling, return configurations, status = 200 +// If "apid-config-index" is given in request parameters, return immediately with status = 200/304 +// If both "block" and "apid-config-index" are given: +// if apid's LSN > apid-config-index in header, return immediately with status = 200 +// if apid's LSN <= apid-config-index, long polling for timeout=block secs func (a *apiManager) apiGetCurrentConfigs(w http.ResponseWriter, r *http.Request) { - - // If returning without a bundle (immediately or after timeout), status = 404 - // If returning If-None-Match value is equal to current deployment, status = 304 - // If returning a new value, status = 200 - - // If timeout > 0 AND there is no deployment (or new deployment) available (per If-None-Match), then - // block for up to the specified number of seconds until a new deployment becomes available. blockSec := r.URL.Query().Get("block") typeFilter := r.URL.Query().Get("type") headerLSN := r.URL.Query().Get(apidConfigIndexPar) @@ -260,7 +260,7 @@ // if filter by "type" if typeFilter != "" { - a.sendReadyDeployments(typeFilter, w, "") + a.sendReadyConfigurations(typeFilter, w, "") return } @@ -279,58 +279,52 @@ if timeout == 0 { // no long polling w.WriteHeader(http.StatusNotModified) } else { // long polling - a.waitForNewCL(w, time.Duration(timeout)) + util.LongPolling(w, time.Duration(timeout)*time.Second, a.addSubscriber, a.LongPollSuccessHandler, a.LongPollTimeoutHandler) } return case cmpRes > 0: //APID_LSN > Header_LSN - a.sendReadyDeployments("", w, apidLSN) + a.sendReadyConfigurations("", w, apidLSN) return } } -func (a *apiManager) waitForNewCL(w http.ResponseWriter, timeout time.Duration) { - ConfigChangeChan := make(chan interface{}, 1) - a.addSubscriber <- ConfigChangeChan - - log.Debug("Long-polling... Waiting for new Deployments.") - - select { - case c := <-ConfigChangeChan: - // send configs and LSN - confChange, ok := c.(*confChangeNotification) - if !ok || confChange.err != nil { - log.Errorf("Wrong confChangeNotification: %v, %v", ok, confChange) - a.writeInternalError(w, "Error getting configurations with long-polling") - return - } - a.sendDeployments(w, confChange.confs, confChange.LSN, "") - case <-time.After(timeout * time.Second): - log.Debug("long-polling configuration request timed out.") - w.WriteHeader(http.StatusNotModified) +func (a *apiManager) LongPollSuccessHandler(c interface{}, w http.ResponseWriter) { + // send configs and LSN + confChange, ok := c.(*confChangeNotification) + if !ok || confChange.err != nil { + log.Errorf("Wrong confChangeNotification: %v, %v", ok, confChange) + a.writeInternalError(w, "Error getting configurations with long-polling") + return } + a.sendDeployments(w, confChange.confs, confChange.LSN, "") } -func (a *apiManager) sendReadyDeployments(typeFilter string, w http.ResponseWriter, apidLSN string) { - deployments, err := a.dbMan.getReadyDeployments(typeFilter) +func (a *apiManager) LongPollTimeoutHandler(w http.ResponseWriter) { + log.Debug("long-polling configuration request timed out.") + w.WriteHeader(http.StatusNotModified) +} + +func (a *apiManager) sendReadyConfigurations(typeFilter string, w http.ResponseWriter, apidLSN string) { + configurations, err := a.dbMan.getReadyConfigurations(typeFilter) if err != nil { log.Errorf("Database error: %v", err) a.writeInternalError(w, fmt.Sprintf("Database error: %s", err.Error())) return } - a.sendDeployments(w, deployments, apidLSN, typeFilter) + a.sendDeployments(w, configurations, apidLSN, typeFilter) } -func (a *apiManager) sendDeployments(w http.ResponseWriter, dataDeps []Configuration, apidLSN string, typeFilter string) { +func (a *apiManager) sendDeployments(w http.ResponseWriter, dataConfs []Configuration, apidLSN string, typeFilter string) { - apiDeps := ApiDeploymentResponse{} - apiDepDetails := make([]ApiDeploymentDetails, 0) + apiConfs := ApiConfigurationResponse{} + apiConfDetails := make([]ApiConfigurationDetails, 0) - apiDeps.Kind = kindCollection - apiDeps.Self = getHttpHost() + a.deploymentsEndpoint + apiConfs.Kind = kindCollection + apiConfs.Self = getHttpHost() + a.configurationEndpoint - for _, d := range dataDeps { - apiDepDetails = append(apiDepDetails, ApiDeploymentDetails{ - Self: apiDeps.Self + "/" + d.ID, + for _, d := range dataConfs { + apiConfDetails = append(apiConfDetails, ApiConfigurationDetails{ + Self: apiConfs.Self + "/" + d.ID, Name: d.Name, Type: d.Type, Revision: d.Revision, @@ -343,13 +337,13 @@ Updated: convertTime(d.Updated), }) } - apiDeps.ApiDeploymentsResponse = apiDepDetails + apiConfs.ApiConfigurationsResponse = apiConfDetails if typeFilter != "" { - apiDeps.Self += "?type=" + typeFilter + apiConfs.Self += "?type=" + typeFilter } - b, err := json.Marshal(apiDeps) + b, err := json.Marshal(apiConfs) if err != nil { log.Errorf("unable to marshal deployments: %v", err) w.WriteHeader(http.StatusInternalServerError) @@ -360,7 +354,7 @@ w.Header().Set(apidConfigIndexHeader, apidLSN) } w.Header().Set("Content-Type", headerJson) - log.Debugf("sending deployments %s: %s", apidLSN, b) + log.Debugf("sending deployments %s", apidLSN) w.Write(b) }
diff --git a/api_test.go b/api_test.go index ece8565..f41963c 100644 --- a/api_test.go +++ b/api_test.go
@@ -46,12 +46,12 @@ lsn: "0.1.1", } testApiMan = &apiManager{ - dbMan: dummyDbMan, - deploymentsEndpoint: deploymentsEndpoint + strconv.Itoa(testCount), - blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", - deploymentIdEndpoint: deploymentsEndpoint + strconv.Itoa(testCount) + "/{configId}", - newChangeListChan: make(chan interface{}, 5), - addSubscriber: make(chan chan interface{}), + dbMan: dummyDbMan, + configurationEndpoint: configEndpoint + strconv.Itoa(testCount), + blobEndpoint: blobEndpointPath + strconv.Itoa(testCount) + "/{blobId}", + configurationIdEndpoint: configEndpoint + strconv.Itoa(testCount) + "/{configId}", + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}), } testApiMan.InitAPI() time.Sleep(100 * time.Millisecond) @@ -65,7 +65,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) // http get res, err := http.Get(uri.String()) @@ -74,16 +74,16 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) Expect(err).Should(Succeed()) // verify response - Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) + Expect(len(depRes.ApiConfigurationsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(apiTestUrl + configEndpoint + strconv.Itoa(testCount))) }) @@ -91,7 +91,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) // set test data details := setTestDeployments(dummyDbMan, uri.String()) @@ -103,7 +103,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -112,7 +112,7 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(uri.String())) - Expect(depRes.ApiDeploymentsResponse).Should(Equal(details)) + Expect(depRes.ApiConfigurationsResponse).Should(Equal(details)) }) @@ -121,7 +121,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("type", typeFilter) @@ -140,7 +140,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -149,7 +149,7 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(uri.String())) - Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) }) @@ -158,7 +158,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("type", typeFilter) @@ -179,7 +179,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -188,7 +188,7 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0] + "?type=" + typeFilter)) - Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) }, 1) @@ -197,7 +197,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) // set test data setTestDeployments(dummyDbMan, uri.String()) @@ -232,7 +232,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("block", "1") query.Add(apidConfigIndexPar, "1.0.0") @@ -256,7 +256,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("block", "2") query.Add(apidConfigIndexPar, dummyDbMan.lsn) @@ -278,7 +278,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -287,7 +287,7 @@ // verify response Expect(depRes.Kind).Should(Equal(kindCollection)) Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0])) - Expect(depRes.ApiDeploymentsResponse).Should(Equal(details)) + Expect(depRes.ApiConfigurationsResponse).Should(Equal(details)) }, 3) It("should support long-polling for multiple subscribers", func() { @@ -296,7 +296,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("block", "3") query.Add(apidConfigIndexPar, dummyDbMan.lsn) @@ -337,7 +337,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + uri.Path = configEndpoint + strconv.Itoa(testCount) for i, t := range testTimes { log.Debug("insert deployment with timestamp: " + t) @@ -355,13 +355,13 @@ defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentResponse + var depRes ApiConfigurationResponse body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) Expect(err).Should(Succeed()) // verify response - Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + Expect(depRes.ApiConfigurationsResponse).Should(Equal([]ApiConfigurationDetails{*detail})) } }) @@ -403,7 +403,7 @@ // setup http client uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038" + uri.Path = configEndpoint + strconv.Itoa(testCount) + "/3ecd351c-1173-40bf-b830-c194e5ef9038" //setup test data dummyDbMan.err = nil @@ -431,7 +431,7 @@ Expect(res.StatusCode).Should(Equal(http.StatusOK)) // parse response - var depRes ApiDeploymentDetails + var depRes ApiConfigurationDetails body, err := ioutil.ReadAll(res.Body) Expect(err).Should(Succeed()) err = json.Unmarshal(body, &depRes) @@ -472,7 +472,7 @@ dummyDbMan.configurations = make(map[string]*Configuration) dummyDbMan.configurations[data[0].(string)] = &Configuration{} // http get - uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string) + uri.Path = configEndpoint + strconv.Itoa(testCount) + "/" + data[0].(string) res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) Expect(res.StatusCode).Should(Equal(expectedCode[i])) @@ -483,12 +483,12 @@ }) -func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiDeploymentDetails { +func setTestDeployments(dummyDbMan *dummyDbManager, self string) []ApiConfigurationDetails { mathrand.Seed(time.Now().UnixNano()) count := mathrand.Intn(5) + 1 deployments := make([]Configuration, count) - details := make([]ApiDeploymentDetails, count) + details := make([]ApiConfigurationDetails, count) for i := 0; i < count; i++ { dep := makeTestDeployment() @@ -522,8 +522,8 @@ return dep } -func makeExpectedDetail(dep *Configuration, self string) *ApiDeploymentDetails { - detail := &ApiDeploymentDetails{ +func makeExpectedDetail(dep *Configuration, self string) *ApiConfigurationDetails { + detail := &ApiConfigurationDetails{ Self: self + "/" + dep.ID, Name: dep.Name, Type: dep.Type, @@ -562,7 +562,7 @@ return d.unreadyBlobIds, nil } -func (d *dummyDbManager) getReadyDeployments(typeFilter string) ([]Configuration, error) { +func (d *dummyDbManager) getReadyConfigurations(typeFilter string) ([]Configuration, error) { if typeFilter == "" { return d.readyDeployments, nil }
diff --git a/bundle.go b/bundle.go index 537777a..61db041 100644 --- a/bundle.go +++ b/bundle.go
@@ -44,17 +44,17 @@ } type bundleManager struct { - blobServerUrl string - dbMan dbManagerInterface - apiMan apiManagerInterface - concurrentDownloads int - markDeploymentFailedAfter time.Duration - bundleRetryDelay time.Duration - bundleCleanupDelay time.Duration - downloadQueue chan *DownloadRequest - isClosed *int32 - workers []*BundleDownloader - client *http.Client + blobServerUrl string + dbMan dbManagerInterface + apiMan apiManagerInterface + concurrentDownloads int + markConfigFailedAfter time.Duration + bundleRetryDelay time.Duration + bundleCleanupDelay time.Duration + downloadQueue chan *DownloadRequest + isClosed *int32 + workers []*BundleDownloader + client *http.Client } type blobServerResponse struct { @@ -99,7 +99,7 @@ if blobId == "" { return nil } - markFailedAt := time.Now().Add(bm.markDeploymentFailedAfter) + markFailedAt := time.Now().Add(bm.markConfigFailedAfter) retryIn := bm.bundleRetryDelay maxBackOff := 5 * time.Minute
diff --git a/bundle_test.go b/bundle_test.go index cad13df..670dc5d 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -72,15 +72,15 @@ // init bundle manager testBundleMan = &bundleManager{ - blobServerUrl: bundleTestUrl, - dbMan: dummyDbMan, - apiMan: dummyApiMan, - concurrentDownloads: concurrentDownloads, - markDeploymentFailedAfter: 5 * time.Second, - bundleRetryDelay: time.Second, - bundleCleanupDelay: 5 * time.Second, - downloadQueue: make(chan *DownloadRequest, downloadQueueSize), - isClosed: new(int32), + blobServerUrl: bundleTestUrl, + dbMan: dummyDbMan, + apiMan: dummyApiMan, + concurrentDownloads: concurrentDownloads, + markConfigFailedAfter: 5 * time.Second, + bundleRetryDelay: time.Second, + bundleCleanupDelay: 5 * time.Second, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), client: &http.Client{ Timeout: time.Second, Transport: &http.Transport{ @@ -124,13 +124,13 @@ }, 4) - It("should mark as failure according to markDeploymentFailedAfter", func() { + It("should mark as failure according to markConfigFailedAfter", func() { // setup timeout atomic.StoreInt32(blobServer.signedTimeout, 1) atomic.StoreInt32(blobServer.blobTimeout, 1) testBundleMan.client.Timeout = 100 * time.Millisecond testBundleMan.bundleRetryDelay = 100 * time.Millisecond - testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond + testBundleMan.markConfigFailedAfter = 200 * time.Millisecond // download blobs id := util.GenerateUUID()
diff --git a/data.go b/data.go index 1a6d014..42e9579 100644 --- a/data.go +++ b/data.go
@@ -53,7 +53,7 @@ setDbVersion(string) initDb() error getUnreadyBlobs() ([]string, error) - getReadyDeployments(typeFilter string) ([]Configuration, error) + getReadyConfigurations(typeFilter string) ([]Configuration, error) updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) getConfigById(string) (*Configuration, error) @@ -145,14 +145,13 @@ FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.id = ?; `, id) - config, err = dataDeploymentsFromRow(row) + config, err = configurationFromDbRow(row) if err != nil { return nil, err } return config, nil } -// getUnreadyDeployments() returns array of resources that are not yet to be processed func (dbc *dbManager) getUnreadyBlobs() (ids []string, err error) { rows, err := dbc.getDb().Query(` @@ -188,7 +187,7 @@ return } -func (dbc *dbManager) getReadyDeployments(typeFilter string) ([]Configuration, error) { +func (dbc *dbManager) getReadyConfigurations(typeFilter string) ([]Configuration, error) { // An alternative statement is in get_ready_deployments.sql // Need testing with large data volume to determine which is better @@ -286,14 +285,14 @@ } defer rows.Close() - deployments, err := dataDeploymentsFromRows(rows) + confs, err := configurationsFromDbRows(rows) if err != nil { return nil, err } - log.Debugf("Configurations ready: %v", deployments) + //log.Debugf("Configurations ready: %v", confs) - return deployments, nil + return confs, nil } @@ -345,7 +344,6 @@ } func (dbc *dbManager) loadLsnFromDb() error { - log.Debug("loadLsnFromDb") var LSN sql.NullString ret := InitLSN @@ -395,7 +393,7 @@ return } -func dataDeploymentsFromRows(rows *sql.Rows) ([]Configuration, error) { +func configurationsFromDbRows(rows *sql.Rows) ([]Configuration, error) { tmp, err := structFromRows(reflect.TypeOf((*Configuration)(nil)).Elem(), rows) if err != nil { return nil, err @@ -427,11 +425,11 @@ return slice.Interface(), nil } -func dataDeploymentsFromRow(row *sql.Row) (*Configuration, error) { +func configurationFromDbRow(row *sql.Row) (*Configuration, error) { tmp, err := structFromRow(reflect.TypeOf((*Configuration)(nil)).Elem(), row) if err != nil { if err != sql.ErrNoRows { - log.Errorf("Error in dataDeploymentsFromRow: %v", err) + log.Errorf("Error in configurationFromDbRow: %v", err) } return nil, err }
diff --git a/data_test.go b/data_test.go index 4c8ca96..50daca0 100644 --- a/data_test.go +++ b/data_test.go
@@ -140,10 +140,10 @@ }) Context("configuration tests", func() { - It("should get empty slice if no deployments are ready", func() { - deps, err := testDbMan.getReadyDeployments("") + It("should get empty slice if no configurations are ready", func() { + confs, err := testDbMan.getReadyConfigurations("") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(BeZero()) + Expect(len(confs)).Should(BeZero()) }) It("should succefully update local FS location", func() { @@ -208,13 +208,13 @@ err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId) Expect(err).Should(Succeed()) - deps, err := testDbMan.getReadyDeployments("") + confs, err := testDbMan.getReadyConfigurations("") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(2)) - for _, dep := range deps { - Expect(dep.BlobID).Should(Equal(readyBlobId)) - if dep.BlobResourceID != "" { - Expect(dep.BlobResourceID).Should(Equal(readyResourceId)) + Expect(len(confs)).Should(Equal(2)) + for _, conf := range confs { + Expect(conf.BlobID).Should(Equal(readyBlobId)) + if conf.BlobResourceID != "" { + Expect(conf.BlobResourceID).Should(Equal(readyResourceId)) } } }) @@ -226,19 +226,19 @@ err = testDbMan.updateLocalFsLocation(readyResourceId, testBlobLocalFsPrefix+readyResourceId) Expect(err).Should(Succeed()) - deps, err := testDbMan.getReadyDeployments("ORGANIZATION") + confs, err := testDbMan.getReadyConfigurations("ORGANIZATION") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(1)) - Expect(deps[0].ID).Should(Equal("319963ff-217e-4ecc-8d6e-c3665e962d1e")) + Expect(len(confs)).Should(Equal(1)) + Expect(confs[0].ID).Should(Equal("319963ff-217e-4ecc-8d6e-c3665e962d1e")) - deps, err = testDbMan.getReadyDeployments("ENVIRONMENT") + confs, err = testDbMan.getReadyConfigurations("ENVIRONMENT") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(1)) - Expect(deps[0].ID).Should(Equal("1dc4895e-6494-4b59-979f-5f4c89c073b4")) + Expect(len(confs)).Should(Equal(1)) + Expect(confs[0].ID).Should(Equal("1dc4895e-6494-4b59-979f-5f4c89c073b4")) - deps, err = testDbMan.getReadyDeployments("INVALID-TYPE") + confs, err = testDbMan.getReadyConfigurations("INVALID-TYPE") Expect(err).Should(Succeed()) - Expect(len(deps)).Should(Equal(0)) + Expect(len(confs)).Should(Equal(0)) }) It("should succefully get all unready blob ids", func() {
diff --git a/glide.yaml b/glide.yaml index 58d19ac..ebb45f2 100644 --- a/glide.yaml +++ b/glide.yaml
@@ -15,7 +15,7 @@ package: github.com/apid/apidGatewayConfDeploy import: - package: github.com/apid/apid-core - version: master + version: ISSUE-66918282 - package: github.com/apigee-labs/transicator/common version: master - package: github.com/gorilla/mux
diff --git a/init.go b/init.go index d04b74c..692d369 100644 --- a/init.go +++ b/init.go
@@ -133,13 +133,13 @@ // initialize api manager apiMan := &apiManager{ - dbMan: dbMan, - deploymentsEndpoint: deploymentsEndpoint, - blobEndpoint: blobEndpoint, - deploymentIdEndpoint: deploymentIdEndpoint, - newChangeListChan: make(chan interface{}, 5), - addSubscriber: make(chan chan interface{}, 100), - apiInitialized: false, + dbMan: dbMan, + configurationEndpoint: configEndpoint, + blobEndpoint: blobEndpoint, + configurationIdEndpoint: configIdEndpoint, + newChangeListChan: make(chan interface{}, 5), + addSubscriber: make(chan chan interface{}, 100), + apiInitialized: false, } // initialize bundle manager @@ -155,16 +155,16 @@ concurrentDownloads := config.GetInt(configConcurrentDownloads) downloadQueueSize := config.GetInt(configDownloadQueueSize) bundleMan := &bundleManager{ - blobServerUrl: blobServerURL, - dbMan: dbMan, - apiMan: apiMan, - concurrentDownloads: concurrentDownloads, - markDeploymentFailedAfter: markDeploymentFailedAfter, - bundleRetryDelay: time.Second, - bundleCleanupDelay: bundleCleanupDelay, - downloadQueue: make(chan *DownloadRequest, downloadQueueSize), - isClosed: new(int32), - client: httpClient, + blobServerUrl: blobServerURL, + dbMan: dbMan, + apiMan: apiMan, + concurrentDownloads: concurrentDownloads, + markConfigFailedAfter: markDeploymentFailedAfter, + bundleRetryDelay: time.Second, + bundleCleanupDelay: bundleCleanupDelay, + downloadQueue: make(chan *DownloadRequest, downloadQueueSize), + isClosed: new(int32), + client: httpClient, } bundleMan.initializeBundleDownloading()
diff --git a/listener.go b/listener.go index 8f00d0f..33389b7 100644 --- a/listener.go +++ b/listener.go
@@ -94,7 +94,7 @@ blobIds, err := h.dbMan.getUnreadyBlobs() if err != nil { - log.Panicf("unable to query database for unready deployments: %v", err) + log.Panicf("unable to query database for unready configurations: %v", err) } log.Debugf("Queuing %d blob downloads", len(blobIds)) @@ -117,16 +117,16 @@ isConfigChanged = true switch change.Operation { case common.Insert: - dep := dataDeploymentFromRow(change.NewRow) - insertedConfigs = append(insertedConfigs, &dep) + conf := configurationFromRow(change.NewRow) + insertedConfigs = append(insertedConfigs, &conf) case common.Delete: - dep := dataDeploymentFromRow(change.OldRow) - deletedConfigs = append(deletedConfigs, &dep) + conf := configurationFromRow(change.OldRow) + deletedConfigs = append(deletedConfigs, &conf) case common.Update: - depNew := dataDeploymentFromRow(change.NewRow) - depOld := dataDeploymentFromRow(change.OldRow) - updatedNewConfigs = append(updatedNewConfigs, &depNew) - updatedOldConfigs = append(updatedOldConfigs, &depOld) + confNew := configurationFromRow(change.NewRow) + confOld := configurationFromRow(change.OldRow) + updatedNewConfigs = append(updatedNewConfigs, &confNew) + updatedOldConfigs = append(updatedOldConfigs, &confOld) default: log.Errorf("unexpected operation: %s", change.Operation) } @@ -149,21 +149,21 @@ } -func dataDeploymentFromRow(row common.Row) (d Configuration) { +func configurationFromRow(row common.Row) (c Configuration) { - row.Get("id", &d.ID) - row.Get("organization_id", &d.OrgID) - row.Get("environment_id", &d.EnvID) - row.Get("bean_blob_id", &d.BlobID) - row.Get("resource_blob_id", &d.BlobResourceID) - row.Get("type", &d.Type) - row.Get("name", &d.Name) - row.Get("revision", &d.Revision) - row.Get("path", &d.Path) - row.Get("created_at", &d.Created) - row.Get("created_by", &d.CreatedBy) - row.Get("updated_at", &d.Updated) - row.Get("updated_by", &d.UpdatedBy) + row.Get("id", &c.ID) + row.Get("organization_id", &c.OrgID) + row.Get("environment_id", &c.EnvID) + row.Get("bean_blob_id", &c.BlobID) + row.Get("resource_blob_id", &c.BlobResourceID) + row.Get("type", &c.Type) + row.Get("name", &c.Name) + row.Get("revision", &c.Revision) + row.Get("path", &c.Path) + row.Get("created_at", &c.Created) + row.Get("created_by", &c.CreatedBy) + row.Get("updated_at", &c.Updated) + row.Get("updated_by", &c.UpdatedBy) return }
diff --git a/longPoll.go b/longPoll.go deleted file mode 100644 index cd14744..0000000 --- a/longPoll.go +++ /dev/null
@@ -1,73 +0,0 @@ -// Copyright 2017 Google Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package apiGatewayConfDeploy - -import "time" - -// distributeEvents() receives elements from deliverChan, and send them to subscribers -// Sending a `chan interface{}` to addSubscriber adds a new subscriber. -// It closes the subscriber channel after sending the element. -// Any subscriber sent to `addSubscriber` should be buffered chan. -func distributeEvents(deliverChan <-chan interface{}, addSubscriber chan chan interface{}) { - subscribers := make([]chan interface{}, 0) - for { - select { - case element, ok := <-deliverChan: - if !ok { - return - } - for _, subscriber := range subscribers { - go func(sub chan interface{}) { - log.Debugf("delivering to: %v", sub) - sub <- element - close(sub) - }(subscriber) - } - subscribers = make([]chan interface{}, 0) - case sub, ok := <-addSubscriber: - if !ok { - return - } - log.Debugf("Add subscriber: %v", sub) - subscribers = append(subscribers, sub) - } - } -} - -func debounce(in chan interface{}, out chan interface{}, window time.Duration) { - send := func(toSend interface{}) { - if toSend != nil { - out <- toSend - } - } - var toSend interface{} = nil - for { - select { - case incoming, ok := <-in: - if ok { - log.Debugf("debouncing %v", incoming) - toSend = incoming - } else { - send(toSend) - log.Debugf("closing debouncer") - close(out) - return - } - case <-time.After(window): - send(toSend) - toSend = nil - } - } -}