update test cases
diff --git a/listener.go b/listener.go index 147f6e3..9d26fc6 100644 --- a/listener.go +++ b/listener.go
@@ -61,9 +61,9 @@ // if no tables, this a startup event for an existing DB /* - if len(snapshot.Tables) != 0 { - log.Panic("snapshot.Tables is not empty!") - } + if len(snapshot.Tables) != 0 { + log.Panic("snapshot.Tables is not empty!") + } */ startupOnExistingDatabase() log.Debug("Snapshot processed")
diff --git a/listener_test.go b/listener_test.go index 3e13b41..09b3dec 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -8,6 +8,7 @@ "net/http" + "fmt" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -18,116 +19,20 @@ Context("ApigeeSync snapshot event", func() { - XIt("should set DB and process", func(done Done) { - - deploymentID := "listener_test_1" - - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle1 := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri) - bundle1Json, err := json.Marshal(bundle1) - Expect(err).ShouldNot(HaveOccurred()) - - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} - - var event = common.Snapshot{ - SnapshotInfo: "test", - Tables: []common.Table{ - { - Name: DEPLOYMENT_TABLE, - Rows: []common.Row{row}, - }, - }, - } - - var listener = make(chan deploymentsResult) - addSubscriber <- listener - - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - - result := <-listener - Expect(result.err).ToNot(HaveOccurred()) - - // from event - Expect(len(result.deployments)).To(Equal(1)) - d := result.deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - Expect(d.BundleName).To(Equal(bundle1.Name)) - Expect(d.BundleURI).To(Equal(bundle1.URI)) - - // from db - deployments, err := getReadyDeployments() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(1)) - d = deployments[0] - - Expect(d.ID).To(Equal(deploymentID)) - Expect(d.BundleName).To(Equal(bundle1.Name)) - Expect(d.BundleURI).To(Equal(bundle1.URI)) - - close(done) - }) - It("should process unready on existing db startup event", func(done Done) { + fmt.Println("should process unready on existing db startup event") deploymentID := "startup_test" - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - - dep := DataDeployment{ - ID: deploymentID, - DataScopeID: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - } - - // init without info == startup on existing DB - var snapshot = common.Snapshot{ - SnapshotInfo: "test", - Tables: []common.Table{}, - } + snapshot, dep := createSnapshotDeployment(deploymentID) db, err := data.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Unable to access database: %v", err) - } + Expect(err).ShouldNot(HaveOccurred()) err = InitDB(db) - if err != nil { - log.Panicf("Unable to initialize database: %v", err) - } - - tx, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) + insertDeploymentToDb(dep, db) var listener = make(chan deploymentsResult) addSubscriber <- listener @@ -145,6 +50,7 @@ }) It("should send deployment statuses on existing db startup event", func(done Done) { + fmt.Println("should send deployment statuses on existing db startup event") successDep := DataDeployment{ ID: "success", @@ -203,14 +109,10 @@ } db, err := data.DBVersion(snapshot.SnapshotInfo) - if err != nil { - log.Panicf("Unable to access database: %v", err) - } + Expect(err).NotTo(HaveOccurred()) err = InitDB(db) - if err != nil { - log.Panicf("Unable to initialize database: %v", err) - } + Expect(err).NotTo(HaveOccurred()) tx, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) @@ -231,37 +133,13 @@ Context("ApigeeSync change event", func() { - XIt("add event should add a deployment", func(done Done) { + It("inserting event should deliver the deployment to subscribers", func(done Done) { deploymentID := "add_test_1" - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) + event, dep := createChangeDeployment(deploymentID) - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundle1Json, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} - - var event = common.ChangeList{ - Changes: []common.Change{ - { - Operation: common.Insert, - Table: DEPLOYMENT_TABLE, - NewRow: row, - }, - }, - } + insertDeploymentToDb(dep, getDB()) var listener = make(chan deploymentsResult) addSubscriber <- listener @@ -279,31 +157,31 @@ d := deployments[0] Expect(d.ID).To(Equal(deploymentID)) - Expect(d.BundleName).To(Equal(bundle.Name)) - Expect(d.BundleURI).To(Equal(bundle.URI)) + Expect(d.BundleName).To(Equal(dep.BundleName)) + Expect(d.BundleURI).To(Equal(dep.BundleURI)) close(done) }) - XIt("delete event should delete a deployment", func(done Done) { + It("delete event should deliver to subscribers", func(done Done) { deploymentID := "delete_test_1" - tx, err := getDB().Begin() - Expect(err).ShouldNot(HaveOccurred()) - dep := DataDeployment{ - ID: deploymentID, - LocalBundleURI: "whatever", - } - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) + // insert deployment + event, dep := createChangeDeployment(deploymentID) + insertDeploymentToDb(dep, getDB()) + listener := make(chan deploymentsResult) + addSubscriber <- listener + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) + // wait for event to propagate + result := <-listener + Expect(result.err).ShouldNot(HaveOccurred()) + // delete deployment + deletDeploymentFromDb(dep, getDB()) row := common.Row{} row["id"] = &common.ColumnVal{Value: deploymentID} - - var event = common.ChangeList{ + event = common.ChangeList{ Changes: []common.Change{ { Operation: common.Delete, @@ -313,19 +191,93 @@ }, } - var listener = make(chan deploymentsResult) + listener = make(chan deploymentsResult) addSubscriber <- listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - - <-listener - - deployments, err := getReadyDeployments() - Expect(err).ShouldNot(HaveOccurred()) - - Expect(len(deployments)).To(Equal(0)) - + result = <-listener + Expect(len(result.deployments)).To(Equal(0)) close(done) }) }) }) + +func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) { + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundle1Json, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} + + changeList := common.ChangeList{ + Changes: []common.Change{ + { + Operation: common.Insert, + Table: DEPLOYMENT_TABLE, + NewRow: row, + }, + }, + } + dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow) + return changeList, dep +} + +func insertDeploymentToDb(dep DataDeployment, db apid.DB) { + tx, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + defer tx.Rollback() + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) +} + +func deletDeploymentFromDb(dep DataDeployment, db apid.DB) { + tx, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + defer tx.Rollback() + err = deleteDeployment(tx, dep.ID) + Expect(err).ShouldNot(HaveOccurred()) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) +} + +func createSnapshotDeployment(deploymentID string) (common.Snapshot, DataDeployment) { + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + + dep := DataDeployment{ + ID: deploymentID, + DataScopeID: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, + } + + // init without info == startup on existing DB + var snapshot = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{}, + } + return snapshot, dep +}