| // Copyright 2017 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package apiGatewayDeploy |
| |
| import ( |
| "encoding/json" |
| "net/url" |
| |
| "net/http/httptest" |
| |
| "net/http" |
| |
| "fmt" |
| "github.com/30x/apid-core" |
| "github.com/apigee-labs/transicator/common" |
| . "github.com/onsi/ginkgo" |
| . "github.com/onsi/gomega" |
| ) |
| |
| var _ = Describe("listener", func() { |
| |
| Context("ApigeeSync snapshot event", func() { |
| |
| /* |
| * Note that the test snapshot should not be empty. |
| * If it's empty, you can't use deploymentsResult chan to mark the end of processing, |
| * so the threads generated by startupOnExistingDatabase() will mess up later tests |
| */ |
| It("should set DB to appropriate version", func(done Done) { |
| saveDB := getDB() |
| deploymentID := "set_version_test" |
| snapshot, dep := createSnapshotDeployment(deploymentID, "test_version") |
| |
| db, err := data.DBVersion(snapshot.SnapshotInfo) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| err = InitDB(db) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| insertDeploymentToDb(dep, db) |
| expectedDB, err := data.DBVersion(snapshot.SnapshotInfo) |
| Expect(err).NotTo(HaveOccurred()) |
| |
| var listener = make(chan deploymentsResult) |
| addSubscriber <- listener |
| |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) |
| |
| result := <-listener |
| Expect(result.err).ShouldNot(HaveOccurred()) |
| |
| // DB should have been set |
| Expect(getDB() == expectedDB).Should(BeTrue()) |
| |
| SetDB(saveDB) |
| close(done) |
| }) |
| |
| It("should process unready on existing db startup event", func(done Done) { |
| |
| saveDB := getDB() |
| |
| deploymentID := "startup_test" |
| |
| snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready") |
| |
| db, err := data.DBVersion(snapshot.SnapshotInfo) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| err = InitDB(db) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| insertDeploymentToDb(dep, db) |
| |
| var listener = make(chan deploymentsResult) |
| addSubscriber <- listener |
| |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) |
| |
| result := <-listener |
| Expect(result.err).ShouldNot(HaveOccurred()) |
| |
| Expect(len(result.deployments)).To(Equal(1)) |
| d := result.deployments[0] |
| |
| Expect(d.ID).To(Equal(deploymentID)) |
| |
| SetDB(saveDB) |
| close(done) |
| }) |
| |
| It("should send deployment statuses on existing db startup event", func(done Done) { |
| |
| saveDB := getDB() |
| |
| successDep := DataDeployment{ |
| ID: "success", |
| LocalBundleURI: "x", |
| DeployStatus: RESPONSE_STATUS_SUCCESS, |
| DeployErrorCode: 1, |
| DeployErrorMessage: "message", |
| } |
| |
| failDep := DataDeployment{ |
| ID: "fail", |
| LocalBundleURI: "x", |
| DeployStatus: RESPONSE_STATUS_FAIL, |
| DeployErrorCode: 1, |
| DeployErrorMessage: "message", |
| } |
| |
| blankDep := DataDeployment{ |
| ID: "blank", |
| LocalBundleURI: "x", |
| } |
| |
| ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
| defer GinkgoRecover() |
| |
| var results apiDeploymentResults |
| err := json.NewDecoder(r.Body).Decode(&results) |
| Expect(err).ToNot(HaveOccurred()) |
| |
| Expect(results).To(HaveLen(2)) |
| |
| Expect(results).To(ContainElement(apiDeploymentResult{ |
| ID: successDep.ID, |
| Status: successDep.DeployStatus, |
| ErrorCode: successDep.DeployErrorCode, |
| Message: successDep.DeployErrorMessage, |
| })) |
| Expect(results).To(ContainElement(apiDeploymentResult{ |
| ID: failDep.ID, |
| Status: failDep.DeployStatus, |
| ErrorCode: failDep.DeployErrorCode, |
| Message: failDep.DeployErrorMessage, |
| })) |
| |
| SetDB(saveDB) |
| close(done) |
| })) |
| |
| var err error |
| apiServerBaseURI, err = url.Parse(ts.URL) |
| Expect(err).NotTo(HaveOccurred()) |
| |
| // init without info == startup on existing DB |
| var snapshot = common.Snapshot{ |
| SnapshotInfo: "test", |
| Tables: []common.Table{}, |
| } |
| |
| db, err := data.DBVersion(snapshot.SnapshotInfo) |
| Expect(err).NotTo(HaveOccurred()) |
| |
| err = InitDBFullColumns(db) |
| Expect(err).NotTo(HaveOccurred()) |
| |
| tx, err := db.Begin() |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| err = InsertDeployment(tx, successDep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = InsertDeployment(tx, failDep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = InsertDeployment(tx, blankDep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| err = tx.Commit() |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) |
| }) |
| }) |
| |
| Context("ApigeeSync change event", func() { |
| |
| It("inserting event should deliver the deployment to subscribers", func(done Done) { |
| |
| deploymentID := "add_test_1" |
| |
| event, dep := createChangeDeployment(deploymentID) |
| |
| // insert full deployment columns |
| tx, err := getDB().Begin() |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = InsertDeployment(tx, dep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = tx.Commit() |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| var listener = make(chan deploymentsResult) |
| addSubscriber <- listener |
| |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &event) |
| |
| // wait for event to propagate |
| result := <-listener |
| Expect(result.err).ShouldNot(HaveOccurred()) |
| |
| deployments, err := getReadyDeployments() |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| Expect(len(deployments)).To(Equal(1)) |
| d := deployments[0] |
| |
| Expect(d.ID).To(Equal(deploymentID)) |
| Expect(d.BundleName).To(Equal(dep.BundleName)) |
| Expect(d.BundleURI).To(Equal(dep.BundleURI)) |
| |
| close(done) |
| }) |
| |
| It("delete event should deliver to subscribers", func(done Done) { |
| |
| deploymentID := "delete_test_1" |
| |
| // insert deployment |
| event, dep := createChangeDeployment(deploymentID) |
| |
| // insert full deployment columns |
| tx, err := getDB().Begin() |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = InsertDeployment(tx, dep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = tx.Commit() |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| listener := make(chan deploymentsResult) |
| addSubscriber <- listener |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &event) |
| // wait for event to propagate |
| result := <-listener |
| Expect(result.err).ShouldNot(HaveOccurred()) |
| |
| // delete deployment |
| deletDeploymentFromDb(dep, getDB()) |
| row := common.Row{} |
| row["id"] = &common.ColumnVal{Value: deploymentID} |
| event = common.ChangeList{ |
| Changes: []common.Change{ |
| { |
| Operation: common.Delete, |
| Table: DEPLOYMENT_TABLE, |
| OldRow: row, |
| }, |
| }, |
| } |
| |
| listener = make(chan deploymentsResult) |
| addSubscriber <- listener |
| apid.Events().Emit(APIGEE_SYNC_EVENT, &event) |
| result = <-listener |
| Expect(result.err).ShouldNot(HaveOccurred()) |
| Expect(len(result.deployments)).To(Equal(0)) |
| close(done) |
| }) |
| }) |
| }) |
| |
| func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) { |
| uri, err := url.Parse(testServer.URL) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| uri.Path = "/bundles/1" |
| bundleUri := uri.String() |
| bundle := bundleConfigJson{ |
| Name: uri.Path, |
| URI: bundleUri, |
| ChecksumType: "crc32", |
| } |
| bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) |
| bundle1Json, err := json.Marshal(bundle) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| row := common.Row{} |
| row["id"] = &common.ColumnVal{Value: deploymentID} |
| row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} |
| |
| changeList := common.ChangeList{ |
| Changes: []common.Change{ |
| { |
| Operation: common.Insert, |
| Table: DEPLOYMENT_TABLE, |
| NewRow: row, |
| }, |
| }, |
| } |
| dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow) |
| return changeList, dep |
| } |
| |
| func insertDeploymentToDb(dep DataDeployment, db apid.DB) { |
| tx, err := db.Begin() |
| Expect(err).ShouldNot(HaveOccurred()) |
| defer tx.Rollback() |
| err = InsertTestDeployment(tx, dep) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = tx.Commit() |
| Expect(err).ShouldNot(HaveOccurred()) |
| } |
| |
| func deletDeploymentFromDb(dep DataDeployment, db apid.DB) { |
| tx, err := db.Begin() |
| Expect(err).ShouldNot(HaveOccurred()) |
| defer tx.Rollback() |
| err = deleteDeployment(tx, dep.ID) |
| Expect(err).ShouldNot(HaveOccurred()) |
| err = tx.Commit() |
| Expect(err).ShouldNot(HaveOccurred()) |
| } |
| |
| func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) { |
| uri, err := url.Parse(testServer.URL) |
| Expect(err).ShouldNot(HaveOccurred()) |
| |
| uri.Path = "/bundles/1" |
| bundleUri := uri.String() |
| bundle := bundleConfigJson{ |
| Name: uri.Path, |
| URI: bundleUri, |
| ChecksumType: "crc32", |
| } |
| bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) |
| |
| jsonBytes, err := json.Marshal(bundle) |
| Expect(err).ShouldNot(HaveOccurred()) |
| fmt.Println("JSON :" + string(jsonBytes)) |
| |
| dep := DataDeployment{ |
| ID: deploymentID, |
| DataScopeID: deploymentID, |
| BundleURI: bundle.URI, |
| BundleChecksum: bundle.Checksum, |
| BundleChecksumType: bundle.ChecksumType, |
| BundleConfigJSON: string(jsonBytes), |
| } |
| |
| // init without info == startup on existing DB |
| var snapshot = common.Snapshot{ |
| SnapshotInfo: snapInfo, |
| Tables: []common.Table{}, |
| } |
| return snapshot, dep |
| } |