[ISSUE-66918282] add tests
diff --git a/api.go b/api.go index 97b9a85..f271370 100644 --- a/api.go +++ b/api.go
@@ -66,10 +66,14 @@ const ( headerSteam = "application/octet-stream" + apidConfigIndexPar = "apid-config-index" apidConfigIndexHeader = "x-apid-config-index" ) -var ErrNoLSN = errors.New("No last sequence in DB") +var ( + ErrNoLSN = errors.New("No last sequence in DB") + ErrInvalidLSN = errors.New(apidConfigIndexPar + " is invalid") +) type deploymentsResult struct { deployments []Configuration @@ -122,7 +126,7 @@ if a.apiInitialized { return } - services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentDeployments).Methods("GET") + services.API().HandleFunc(a.deploymentsEndpoint, a.apiGetCurrentConfigs).Methods("GET") services.API().HandleFunc(a.blobEndpoint, a.apiReturnBlobData).Methods("GET") services.API().HandleFunc(a.deploymentIdEndpoint, a.apiHandleConfigId).Methods("GET") a.initDistributeEvents() @@ -216,7 +220,7 @@ w.Write(b) } -func (a *apiManager) apiGetCurrentDeployments(w http.ResponseWriter, r *http.Request) { +func (a *apiManager) apiGetCurrentConfigs(w http.ResponseWriter, r *http.Request) { // If returning without a bundle (immediately or after timeout), status = 404 // If returning If-None-Match value is equal to current deployment, status = 304 @@ -226,7 +230,7 @@ // block for up to the specified number of seconds until a new deployment becomes available. blockSec := r.URL.Query().Get("block") typeFilter := r.URL.Query().Get("type") - headerLSN := r.URL.Query().Get("apid-config-index") + headerLSN := r.URL.Query().Get(apidConfigIndexPar) var timeout int var err error if blockSec != "" { @@ -246,18 +250,13 @@ return } - // if no Long Poll Index - if headerLSN == "" { - headerLSN = "0.0.0" - } - // if no filter, check for long polling cmpRes, apidLSN, err := a.compareLSN(headerLSN) switch { case err != nil: - if err == ErrNoLSN { // apid hasn't got any LSN from Change Server - // This may happen during apid bootstrap - a.waitForNewCL(w, time.Duration(timeout)) + if err == ErrInvalidLSN { + a.writeError(w, http.StatusBadRequest, http.StatusBadRequest, err.Error()) + return } log.Errorf("Error in compareLSN: %v", err) a.writeInternalError(w, err.Error()) @@ -290,6 +289,7 @@ a.writeInternalError(w, "Wrong LSN type") return } + //TODO: read db only once for all subscribers a.sendReadyDeployments("", w, lsn) case <-time.After(timeout * time.Second): log.Debug("long-polling configuration request timed out.") @@ -352,15 +352,18 @@ } func (a *apiManager) compareLSN(headerLSN string) (res int, apidLSN string, err error) { - apidLSN, err = a.dbMan.getLastSequence() + apidLSN = a.dbMan.getLSN() log.Debugf("apidLSN: %v", apidLSN) - if err != nil { - log.Errorf("Error when getLastSequence: %v", err) - return 0, "", err + + // if no Long Poll Index + if headerLSN == "" { + return 1, apidLSN, nil } - if apidLSN == "" { - log.Errorf("Error when getLastSequence: %v", ErrNoLSN) - return 0, "", ErrNoLSN + + headerSeq, err := common.ParseSequence(headerLSN) + if err != nil { + log.Debugf("Error when Parse headerLSN Sequence: %v", err) + return 0, "", ErrInvalidLSN } apidSeq, err := common.ParseSequence(apidLSN) @@ -368,11 +371,6 @@ log.Errorf("Error when Parse apidLSN Sequence: %v", err) return 0, "", err } - headerSeq, err := common.ParseSequence(headerLSN) - if err != nil { - log.Errorf("Error when Parse headerLSN Sequence: %v", err) - return 0, "", err - } return apidSeq.Compare(headerSeq), apidLSN, nil }
diff --git a/api_test.go b/api_test.go index 24a5e95..ece8565 100644 --- a/api_test.go +++ b/api_test.go
@@ -43,7 +43,7 @@ var _ = BeforeEach(func() { testCount += 1 dummyDbMan = &dummyDbManager{ - lsn: "19.1d3e9368.0", + lsn: "0.1.1", } testApiMan = &apiManager{ dbMan: dummyDbMan, @@ -122,7 +122,10 @@ uri, err := url.Parse(apiTestUrl) Expect(err).Should(Succeed()) uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) - uri.RawQuery = "type=" + typeFilter + + query := uri.Query() + query.Add("type", typeFilter) + uri.RawQuery = query.Encode() // set test data dep := makeTestDeployment() @@ -150,6 +153,45 @@ }) + It("should not long poll if using filter", func() { + typeFilter := "ORGANIZATION" + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + + query := uri.Query() + query.Add("type", typeFilter) + query.Add("block", "3") + query.Add(apidConfigIndexPar, dummyDbMan.lsn) + uri.RawQuery = query.Encode() + // set test data + dep := makeTestDeployment() + + dummyDbMan.configurations = make(map[string]*Configuration) + dummyDbMan.configurations[typeFilter] = dep + detail := makeExpectedDetail(dep, strings.Split(uri.String(), "?")[0]) + + // http get + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + + // parse response + var depRes ApiDeploymentResponse + body, err := ioutil.ReadAll(res.Body) + Expect(err).Should(Succeed()) + err = json.Unmarshal(body, &depRes) + Expect(err).Should(Succeed()) + + // verify response + Expect(depRes.Kind).Should(Equal(kindCollection)) + Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0] + "?type=" + typeFilter)) + Expect(depRes.ApiDeploymentsResponse).Should(Equal([]ApiDeploymentDetails{*detail})) + + }, 1) + It("should get 304 for no change", func() { // setup http client @@ -164,11 +206,13 @@ Expect(err).Should(Succeed()) defer res.Body.Close() Expect(res.StatusCode).Should(Equal(http.StatusOK)) - lsn := res.Header.Get("x-apid-config-index") + lsn := res.Header.Get(apidConfigIndexHeader) Expect(lsn).ShouldNot(BeEmpty()) // send second request - uri.RawQuery = "apid-config-index=" + lsn + query := uri.Query() + query.Add(apidConfigIndexPar, lsn) + uri.RawQuery = query.Encode() log.Debug(uri.String()) req, err := http.NewRequest("GET", uri.String(), nil) req.Header.Add("Content-Type", "application/json") @@ -181,7 +225,7 @@ }) // block is not enabled now - XIt("should get empty set after blocking if no deployments", func() { + It("should do long-polling if Gateway_LSN>=APID_LSN, should get 304 for timeout", func() { start := time.Now() @@ -191,19 +235,48 @@ uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) query := uri.Query() query.Add("block", "1") + query.Add(apidConfigIndexPar, "1.0.0") uri.RawQuery = query.Encode() // http get res, err := http.Get(uri.String()) Expect(err).Should(Succeed()) defer res.Body.Close() - Expect(res.StatusCode).Should(Equal(http.StatusOK)) + Expect(res.StatusCode).Should(Equal(http.StatusNotModified)) //verify blocking time blockingTime := time.Since(start) - log.Warnf("time used: %v", blockingTime.Seconds()) Expect(blockingTime.Seconds() > 0.9).Should(BeTrue()) + }, 2) + + It("should do long-polling if Gateway_LSN>=APID_LSN, should get 200 if not timeout", func() { + + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + query := uri.Query() + query.Add("block", "2") + query.Add(apidConfigIndexPar, dummyDbMan.lsn) + uri.RawQuery = query.Encode() + + // set test data + details := setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0]) + + // notify change + go func() { + time.Sleep(time.Second) + testApiMan.notifyNewChangeList(testLSN) + }() + + // http get + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + Expect(res.StatusCode).Should(Equal(http.StatusOK)) + // parse response var depRes ApiDeploymentResponse body, err := ioutil.ReadAll(res.Body) @@ -212,11 +285,50 @@ Expect(err).Should(Succeed()) // verify response - Expect(len(depRes.ApiDeploymentsResponse)).To(Equal(0)) Expect(depRes.Kind).Should(Equal(kindCollection)) - Expect(depRes.Self).Should(Equal(apiTestUrl + deploymentsEndpoint + strconv.Itoa(testCount))) + Expect(depRes.Self).Should(Equal(strings.Split(uri.String(), "?")[0])) + Expect(depRes.ApiDeploymentsResponse).Should(Equal(details)) + }, 3) - }, 2) + It("should support long-polling for multiple subscribers", func() { + + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // setup http client + uri, err := url.Parse(apiTestUrl) + Expect(err).Should(Succeed()) + uri.Path = deploymentsEndpoint + strconv.Itoa(testCount) + query := uri.Query() + query.Add("block", "3") + query.Add(apidConfigIndexPar, dummyDbMan.lsn) + uri.RawQuery = query.Encode() + + // set test data + setTestDeployments(dummyDbMan, strings.Split(uri.String(), "?")[0]) + + // http get + count := mathrand.Intn(20) + 5 + finishChan := make(chan int) + for i := 0; i < count; i++ { + go func() { + defer GinkgoRecover() + res, err := http.Get(uri.String()) + Expect(err).Should(Succeed()) + defer res.Body.Close() + finishChan <- res.StatusCode + }() + } + + // notify change + go func() { + time.Sleep(1500 * time.Millisecond) + testApiMan.notifyNewChangeList(testLSN) + }() + + for i := 0; i < count; i++ { + Expect(<-finishChan).Should(Equal(http.StatusOK)) + } + + }, 5) It("should get iso8601 time", func() { testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00", "2017-06-22 16:41:02.334"} @@ -467,7 +579,10 @@ if err != nil { return err } - d.fileResponse <- string(buff) + go func(buff []byte) { + d.fileResponse <- string(buff) + }(buff) + return nil } @@ -478,6 +593,16 @@ func (d *dummyDbManager) getConfigById(id string) (*Configuration, error) { return d.configurations[id], d.err } -func (d *dummyDbManager) getLastSequence() (string, error) { - return d.lsn, nil +func (d *dummyDbManager) getLSN() string { + return d.lsn +} + +func (d *dummyDbManager) updateLSN(LSN string) error { + d.lsn = LSN + return nil +} + +func (d *dummyDbManager) loadLsnFromDb() error { + + return nil }
diff --git a/bundle.go b/bundle.go index d1fb8bd..64e1e69 100644 --- a/bundle.go +++ b/bundle.go
@@ -34,6 +34,7 @@ type bundleManagerInterface interface { initializeBundleDownloading() + // if `configs` is empty, it just exposes the changeList downloadBlobsForChangeList(configs []*Configuration, LSN string) enqueueRequest(*DownloadRequest) makeDownloadRequest(blobId string, changelistRequest *ChangeListDownloadRequest) *DownloadRequest @@ -226,8 +227,6 @@ return err } - log.Debugf("blod downloaded. blobid=%s filepath=%s", r.blobId, downloadedFile) - err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile) if err != nil { log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId) @@ -311,7 +310,7 @@ func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) { var tempFile *os.File - log.Debugf("Downloading bundle: %s", blobId) + log.Debugf("Downloading Blob: %s", blobId) uri, err := getSignedURL(client, blobServerURL, blobId) if err != nil { @@ -330,18 +329,18 @@ var confReader io.ReadCloser confReader, err = getUriReaderWithAuth(client, uri) if err != nil { - log.Errorf("Unable to retrieve bundle %s: %v", uri, err) + log.Errorf("Unable to retrieve Blob %s: %v", uri, err) return } defer confReader.Close() _, err = io.Copy(tempFile, confReader) if err != nil { - log.Errorf("Unable to write bundle %s: %v", tempFileName, err) + log.Errorf("Unable to write Blob %s: %v", tempFileName, err) return } - log.Debugf("Bundle %s downloaded to: %s", uri, tempFileName) + log.Debugf("Blob %s downloaded to: %s", uri, tempFileName) return }
diff --git a/bundle_test.go b/bundle_test.go index 8b79358..cad13df 100644 --- a/bundle_test.go +++ b/bundle_test.go
@@ -19,11 +19,13 @@ "bytes" "encoding/json" + "fmt" "github.com/apid/apid-core/util" "github.com/gorilla/mux" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "io" + mathrand "math/rand" "strings" "sync/atomic" "time" @@ -64,7 +66,9 @@ } // init dummy api manager - dummyApiMan = &dummyApiManager{} + dummyApiMan = &dummyApiManager{ + lsnChan: make(chan string, 1), + } // init bundle manager testBundleMan = &bundleManager{ @@ -95,52 +99,111 @@ dummyApiMan = nil }) - It("should download blob according to id", func() { - // download blob - id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) - received := <-dummyDbMan.fileResponse - Expect(received).Should(Equal(id)) + Context("download blobs", func() { + + It("should download blob according to id", func() { + // download blob + id := util.GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + }) + + It("should timeout connection and retry", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 500 * time.Millisecond + testBundleMan.bundleRetryDelay = 50 * time.Millisecond + + // download blobs + id := util.GenerateUUID() + testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) + received := <-dummyDbMan.fileResponse + Expect(received).Should(Equal(id)) + + }, 4) + + It("should mark as failure according to markDeploymentFailedAfter", func() { + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 100 * time.Millisecond + testBundleMan.bundleRetryDelay = 100 * time.Millisecond + testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond + + // download blobs + id := util.GenerateUUID() + req := testBundleMan.makeDownloadRequest(id, nil) + Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) + testBundleMan.enqueueRequest(req) + + // should fail + time.Sleep(time.Second) + Expect(req.markFailedAt.IsZero()).Should(BeTrue()) + }, 4) }) - It("should timeout connection and retry", func() { - // setup timeout - atomic.StoreInt32(blobServer.signedTimeout, 1) - atomic.StoreInt32(blobServer.blobTimeout, 1) - testBundleMan.client.Timeout = 500 * time.Millisecond - testBundleMan.bundleRetryDelay = 50 * time.Millisecond + Context("download blobs for changelist", func() { + It("should download blobs for changelist", func() { + //setup test data + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + count := mathrand.Intn(10) + 1 + configs := make([]*Configuration, count) + for i := 0; i < count; i++ { + conf := makeTestDeployment() + conf.BlobID = util.GenerateUUID() + conf.BlobResourceID = util.GenerateUUID() + configs[i] = conf + } - // download blobs - id := util.GenerateUUID() - testBundleMan.enqueueRequest(testBundleMan.makeDownloadRequest(id, nil)) - received := <-dummyDbMan.fileResponse - Expect(received).Should(Equal(id)) + // should download blobs for changelist + testBundleMan.downloadBlobsForChangeList(configs, testLSN) + for i := 0; i < 2*count; i++ { + <-dummyDbMan.fileResponse + } - }, 4) + // should notify after 1st download attempt + Expect(<-dummyApiMan.lsnChan).Should(Equal(testLSN)) + }) - It("should mark as failure according to markDeploymentFailedAfter", func() { - // setup timeout - atomic.StoreInt32(blobServer.signedTimeout, 1) - atomic.StoreInt32(blobServer.blobTimeout, 1) - testBundleMan.client.Timeout = 100 * time.Millisecond - testBundleMan.bundleRetryDelay = 100 * time.Millisecond - testBundleMan.markDeploymentFailedAfter = 200 * time.Millisecond + It("should notify after 1st download attempt unless failure", func() { + //setup test data + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + count := mathrand.Intn(10) + 1 + configs := make([]*Configuration, count) + for i := 0; i < count; i++ { + conf := makeTestDeployment() + conf.BlobID = util.GenerateUUID() + conf.BlobResourceID = util.GenerateUUID() + configs[i] = conf + } - // download blobs - id := util.GenerateUUID() - req := testBundleMan.makeDownloadRequest(id, nil) - Expect(req.markFailedAt.After(time.Now())).Should(BeTrue()) - testBundleMan.enqueueRequest(req) + // setup timeout + atomic.StoreInt32(blobServer.signedTimeout, 1) + atomic.StoreInt32(blobServer.blobTimeout, 1) + testBundleMan.client.Timeout = 500 * time.Millisecond + testBundleMan.bundleRetryDelay = 50 * time.Millisecond - // should fail - time.Sleep(time.Second) - Expect(req.markFailedAt.IsZero()).Should(BeTrue()) - }, 4) + // should download blobs for changelist + testBundleMan.downloadBlobsForChangeList(configs, testLSN) + + // should notify after 1st download attempt + Expect(<-dummyApiMan.lsnChan).Should(Equal(testLSN)) + + //should retry download + for i := 0; i < 2*count; i++ { + <-dummyDbMan.fileResponse + } + }) + + }) + }) type dummyApiManager struct { initCalled bool - LSN string + lsnChan chan string } func (a *dummyApiManager) InitAPI() { @@ -148,7 +211,7 @@ } func (a *dummyApiManager) notifyNewChangeList(newLSN string) { - a.LSN = newLSN + a.lsnChan <- newLSN } type dummyBlobServer struct {
diff --git a/data.go b/data.go index 06fe87c..37da7c2 100644 --- a/data.go +++ b/data.go
@@ -21,6 +21,10 @@ "reflect" ) +const ( + InitLSN = "0.0.0" +) + var ( gwBlobId int64 ) @@ -53,13 +57,17 @@ updateLocalFsLocation(string, string) error getLocalFSLocation(string) (string, error) getConfigById(string) (*Configuration, error) - getLastSequence() (string, error) + loadLsnFromDb() error + updateLSN(LSN string) error + getLSN() string } type dbManager struct { - data apid.DataService - db apid.DB - dbMux sync.RWMutex + data apid.DataService + db apid.DB + dbMux sync.RWMutex + apidLSN string + lsnMutex sync.RWMutex } func (dbc *dbManager) setDbVersion(version string) { @@ -85,7 +93,7 @@ } defer tx.Rollback() _, err = tx.Exec(` - CREATE TABLE IF NOT EXISTS apid_blob_available ( + CREATE TABLE IF NOT EXISTS APID_BLOB_AVAILABLE ( id text primary key, local_fs_location text NOT NULL ); @@ -93,11 +101,29 @@ if err != nil { return err } - err = tx.Commit() + _, err = tx.Exec(` + CREATE TABLE IF NOT EXISTS APID_CONFIGURATION_LSN ( + lsn text primary key + ); + `) if err != nil { return err } - log.Debug("Database table apid_blob_available created.") + + // insert a row if APID_CONFIGURATION_LSN is empty + _, err = tx.Exec(` + INSERT INTO APID_CONFIGURATION_LSN (lsn) + SELECT '0.0.0' + WHERE NOT EXISTS (SELECT * FROM APID_CONFIGURATION_LSN) + `) + if err != nil { + return err + } + + if err = tx.Commit(); err != nil { + return err + } + log.Debug("Database table APID_BLOB_AVAILABLE, APID_CONFIGURATION_LSN created.") return nil } @@ -116,7 +142,7 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.id = ?; `, id) config, err = dataDeploymentsFromRow(row) @@ -132,14 +158,14 @@ rows, err := dbc.getDb().Query(` SELECT id FROM ( SELECT a.bean_blob_id as id - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.bean_blob_id NOT IN - (SELECT b.id FROM apid_blob_available as b) + (SELECT b.id FROM APID_BLOB_AVAILABLE as b) UNION SELECT a.resource_blob_id as id - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.resource_blob_id NOT IN - (SELECT b.id FROM apid_blob_available as b) + (SELECT b.id FROM APID_BLOB_AVAILABLE as b) ) WHERE id IS NOT NULL AND id != '' ; @@ -184,27 +210,27 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.id IN ( SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.resource_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" INTERSECT SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" UNION SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = "" ) @@ -225,28 +251,28 @@ a.created_by, a.updated_at, a.updated_by - FROM metadata_runtime_entity_metadata as a + FROM METADATA_RUNTIME_ENTITY_METADATA as a WHERE a.type = ? AND a.id IN ( SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.resource_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" INTERSECT SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NOT NULL AND a.resource_blob_id != "" UNION SELECT a.id - FROM metadata_runtime_entity_metadata as a - INNER JOIN apid_blob_available as b + FROM METADATA_RUNTIME_ENTITY_METADATA as a + INNER JOIN APID_BLOB_AVAILABLE as b ON a.bean_blob_id = b.id WHERE a.resource_blob_id IS NULL OR a.resource_blob_id = "" ) @@ -278,21 +304,21 @@ } defer txn.Rollback() _, err = txn.Exec(` - INSERT OR IGNORE INTO apid_blob_available ( + INSERT OR IGNORE INTO APID_BLOB_AVAILABLE ( id, local_fs_location ) VALUES (?, ?);`, blobId, localFsLocation) if err != nil { - log.Errorf("INSERT apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err) + log.Errorf("INSERT APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err) return err } err = txn.Commit() if err != nil { - log.Errorf("UPDATE apid_blob_available id {%s} local_fs_location {%s} failed", localFsLocation, err) + log.Errorf("UPDATE APID_BLOB_AVAILABLE id {%s} local_fs_location {%s} failed", localFsLocation, err) return err } - log.Debugf("INSERT apid_blob_available {%s} local_fs_location {%s} succeeded", blobId, localFsLocation) + log.Debugf("INSERT APID_BLOB_AVAILABLE {%s} local_fs_location {%s} succeeded", blobId, localFsLocation) return nil } @@ -300,7 +326,7 @@ func (dbc *dbManager) getLocalFSLocation(blobId string) (localFsLocation string, err error) { log.Debugf("Getting the blob file for blobId {%s}", blobId) - rows, err := dbc.getDb().Query("SELECT local_fs_location FROM apid_blob_available WHERE id = '" + blobId + "'") + rows, err := dbc.getDb().Query("SELECT local_fs_location FROM APID_BLOB_AVAILABLE WHERE id = '" + blobId + "'") if err != nil { log.Errorf("SELECT local_fs_location failed %v", err) return "", err @@ -318,19 +344,54 @@ return } -func (dbc *dbManager) getLastSequence() (string, error) { - var lastSequence sql.NullString - err := dbc.getDb().QueryRow("select last_sequence from EDGEX_APID_CLUSTER LIMIT 1").Scan(&lastSequence) +func (dbc *dbManager) loadLsnFromDb() error { + var LSN sql.NullString + ret := InitLSN + + // If there's LSN for configuration + err := dbc.getDb().QueryRow("select lsn from APID_CONFIGURATION_LSN LIMIT 1").Scan(&LSN) if err != nil && err != sql.ErrNoRows { - log.Errorf("Failed to select last_sequence from EDGEX_APID_CLUSTER: %v", err) - return "", err + log.Errorf("Failed to select lsn from APID_CONFIGURATION_LSN: %v", err) + return err } - ret := "" - if lastSequence.Valid { - ret = lastSequence.String + if LSN.Valid { + ret = LSN.String + log.Debugf("LSN from APID_CONFIGURATION_LSN: %s", LSN) } - log.Debugf("lastSequence: %s", lastSequence) - return ret, nil + dbc.lsnMutex.Lock() + defer dbc.lsnMutex.Unlock() + dbc.apidLSN = ret + return nil +} + +func (dbc *dbManager) getLSN() string { + dbc.lsnMutex.RLock() + defer dbc.lsnMutex.RUnlock() + return dbc.apidLSN +} + +func (dbc *dbManager) updateLSN(LSN string) (err error) { + + tx, err := dbc.getDb().Begin() + if err != nil { + log.Errorf("getApidInstanceInfo: Unable to get DB tx Err: {%v}", err) + return + } + defer tx.Rollback() + _, err = tx.Exec("UPDATE APID_CONFIGURATION_LSN SET lsn=?;", LSN) + if err != nil { + log.Errorf("UPDATE APID_CONFIGURATION_LSN Failed: %v", err) + return + } + log.Debugf("UPDATE APID_CONFIGURATION_LSN Success: %s", LSN) + if err = tx.Commit(); err != nil { + log.Errorf("Commit error in updateLSN: %v", err) + return + } + dbc.lsnMutex.Lock() + defer dbc.lsnMutex.Unlock() + dbc.apidLSN = LSN + return } func dataDeploymentsFromRows(rows *sql.Rows) ([]Configuration, error) {
diff --git a/data_test.go b/data_test.go index 50b2472..4c8ca96 100644 --- a/data_test.go +++ b/data_test.go
@@ -15,6 +15,8 @@ package apiGatewayConfDeploy import ( + "database/sql" + "fmt" "github.com/apid/apid-core" "github.com/apid/apid-core/data" . "github.com/onsi/ginkgo" @@ -44,8 +46,9 @@ var _ = BeforeEach(func() { testCount += 1 testDbMan = &dbManager{ - data: services.Data(), - dbMux: sync.RWMutex{}, + data: services.Data(), + dbMux: sync.RWMutex{}, + lsnMutex: sync.RWMutex{}, } testDbMan.setDbVersion("test" + strconv.Itoa(testCount)) initTestDb(testDbMan.getDb()) @@ -59,7 +62,7 @@ data.Delete(data.VersionedDBID("common", "test"+strconv.Itoa(testCount))) }) - Context("db tests", func() { + Context("basic db tests", func() { It("initDb() should be idempotent", func() { err := testDbMan.initDb() Expect(err).Should(Succeed()) @@ -90,6 +93,53 @@ Expect(count).Should(Equal(6)) }) + It("should initialize support for long-polling", func() { + // APID_CONFIGURATION_LSN + rows, err := testDbMan.getDb().Query(` + SELECT lsn from APID_CONFIGURATION_LSN; + `) + Expect(err).Should(Succeed()) + defer rows.Close() + count := 0 + var lsn sql.NullString + for rows.Next() { + count++ + rows.Scan(&lsn) + } + Expect(count).Should(Equal(1)) + Expect(lsn.Valid).Should(BeTrue()) + Expect(lsn.String).Should(Equal(InitLSN)) + }) + + It("should maintain LSN", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + // write + err := testDbMan.updateLSN(testLSN) + Expect(err).Should(Succeed()) + rows, err := testDbMan.getDb().Query(` + SELECT lsn from APID_CONFIGURATION_LSN; + `) + defer rows.Close() + count := 0 + var lsn sql.NullString + for rows.Next() { + count++ + rows.Scan(&lsn) + } + Expect(count).Should(Equal(1)) + Expect(lsn.Valid).Should(BeTrue()) + Expect(lsn.String).Should(Equal(testLSN)) + + // read + Expect(testDbMan.getLSN()).Should(Equal(testLSN)) + + //load + Expect(testDbMan.loadLsnFromDb()).Should(Succeed()) + Expect(testDbMan.apidLSN).Should(Equal(testLSN)) + }) + }) + + Context("configuration tests", func() { It("should get empty slice if no deployments are ready", func() { deps, err := testDbMan.getReadyDeployments("") Expect(err).Should(Succeed())
diff --git a/listener.go b/listener.go index 51304b4..dc7eaaf 100644 --- a/listener.go +++ b/listener.go
@@ -72,6 +72,11 @@ h.dbMan.setDbVersion(snapshot.SnapshotInfo) h.startupOnExistingDatabase() + if lsn := h.dbMan.getLSN(); lsn != "" { + h.dbMan.updateLSN(lsn) + } else { //apid just started + h.dbMan.loadLsnFromDb() + } h.apiMan.InitAPI() log.Debug("Snapshot processed") } @@ -99,9 +104,11 @@ log.Debugf("Processing changes") // changes have been applied to DB by apidApigeeSync var insertedConfigs, updatedNewConfigs, updatedOldConfigs, deletedConfigs []*Configuration + isConfigChanged := false for _, change := range changes.Changes { switch change.Table { case CONFIG_METADATA_TABLE: + isConfigChanged = true switch change.Operation { case common.Insert: dep := dataDeploymentFromRow(change.NewRow) @@ -119,15 +126,20 @@ } } } - // deleted old configs + // delete old configs from FS if len(deletedConfigs)+len(updatedOldConfigs) > 0 { log.Debugf("will delete %d old blobs", len(deletedConfigs)+len(updatedOldConfigs)) //TODO delete blobs for deleted configs go h.bundleMan.deleteBlobsFromConfigs(append(deletedConfigs, updatedOldConfigs...)) } - // new configs - h.bundleMan.downloadBlobsForChangeList(append(insertedConfigs, updatedNewConfigs...), changes.LastSequence) + // download and expose new configs + if isConfigChanged { + h.dbMan.updateLSN(changes.LastSequence) + h.bundleMan.downloadBlobsForChangeList(append(insertedConfigs, updatedNewConfigs...), changes.LastSequence) + } else if h.dbMan.getLSN() == InitLSN { + h.dbMan.updateLSN(changes.LastSequence) + } }
diff --git a/listener_test.go b/listener_test.go index 41f68d3..938ca61 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -31,12 +31,18 @@ var dummyApiMan *dummyApiManager var dummyBundleMan *dummyBundleManager var testHandler *apigeeSyncHandler + var testCount int var _ = BeforeEach(func() { + testCount += 1 // stop handler created by initPlugin() eventHandler.stopListener(services) - dummyApiMan = &dummyApiManager{} - dummyDbMan = &dummyDbManager{} + dummyApiMan = &dummyApiManager{ + lsnChan: make(chan string, 1), + } + dummyDbMan = &dummyDbManager{ + lsn: "0.0.1", + } dummyBundleMan = &dummyBundleManager{ requestChan: make(chan *DownloadRequest), depChan: make(chan *Configuration), @@ -72,7 +78,7 @@ SnapshotInfo: fmt.Sprint(rand.Uint32()), } - apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, snapshot) for i := 0; i < len(unreadyBlobIds); i++ { req := <-dummyBundleMan.requestChan @@ -124,7 +130,7 @@ Changes: changes, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify for i := 0; i < len(changes); i++ { @@ -154,7 +160,7 @@ Changes: changes, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify for i := 0; i < len(changes); i++ { @@ -198,7 +204,7 @@ LastSequence: testLSN, } - apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) // verify for i := 0; i < len(configsNew); i++ { @@ -213,7 +219,78 @@ } }) + }) + Context("LSN", func() { + It("changelist with CONFIG_METADATA_TABLE should update apidLSN", func() { + // emit change event + changes := make([]common.Change, 0) + deployments := make(map[string]Configuration) + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + for i := 0; i < 1+rand.Intn(10); i++ { + dep := makeTestDeployment() + change := common.Change{ + Operation: common.Insert, + Table: CONFIG_METADATA_TABLE, + NewRow: rowFromDeployment(dep), + } + changes = append(changes, change) + deployments[dep.ID] = *dep + } + + changeList := &common.ChangeList{ + Changes: changes, + LastSequence: testLSN, + } + + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + for i := 0; i < len(changes); i++ { + <-dummyBundleMan.depChan + } + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) + + }) + + It("changelist without CONFIG_METADATA_TABLE shouldn't update apidLSN", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + dummyDbMan.lsn = testLSN + // emit change event + changes := make([]common.Change, 0) + deployments := make(map[string]Configuration) + for i := 0; i < 1+rand.Intn(10); i++ { + dep := makeTestDeployment() + change := common.Change{ + Operation: common.Insert, + Table: "somewhat-table", + NewRow: rowFromDeployment(dep), + } + changes = append(changes, change) + deployments[dep.ID] = *dep + } + + changeList := &common.ChangeList{ + Changes: changes, + LastSequence: "aaa.aaa.aaa", + } + + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) + + }) + + It("changelist should always update apidLSN if it has init value", func() { + testLSN := fmt.Sprintf("%d.%d.%d", testCount, testCount, testCount) + dummyDbMan.lsn = InitLSN + // emit change event + changeList := &common.ChangeList{ + Changes: nil, + LastSequence: testLSN, + } + + <-apid.Events().Emit(APIGEE_SYNC_EVENT, changeList) + Expect(dummyDbMan.getLSN()).Should(Equal(testLSN)) + + }) }) }) @@ -230,10 +307,11 @@ func (bm *dummyBundleManager) downloadBlobsForChangeList(configs []*Configuration, LSN string) { bm.LSN = LSN - for _, conf := range configs { - bm.depChan <- conf - } - + go func() { + for _, conf := range configs { + bm.depChan <- conf + } + }() } func (bm *dummyBundleManager) enqueueRequest(req *DownloadRequest) {
diff --git a/longPoll.go b/longPoll.go index 98d61ee..cd14744 100644 --- a/longPoll.go +++ b/longPoll.go
@@ -1,3 +1,17 @@ +// Copyright 2017 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package apiGatewayConfDeploy import "time"