update test cases
diff --git a/listener.go b/listener.go
index 147f6e3..9d26fc6 100644
--- a/listener.go
+++ b/listener.go
@@ -61,9 +61,9 @@
// if no tables, this a startup event for an existing DB
/*
- if len(snapshot.Tables) != 0 {
- log.Panic("snapshot.Tables is not empty!")
- }
+ if len(snapshot.Tables) != 0 {
+ log.Panic("snapshot.Tables is not empty!")
+ }
*/
startupOnExistingDatabase()
log.Debug("Snapshot processed")
diff --git a/listener_test.go b/listener_test.go
index 3e13b41..09b3dec 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -8,6 +8,7 @@
"net/http"
+ "fmt"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -18,116 +19,20 @@
Context("ApigeeSync snapshot event", func() {
- XIt("should set DB and process", func(done Done) {
-
- deploymentID := "listener_test_1"
-
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle1 := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle1)
- Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- var event = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{
- {
- Name: DEPLOYMENT_TABLE,
- Rows: []common.Row{row},
- },
- },
- }
-
- var listener = make(chan deploymentsResult)
- addSubscriber <- listener
-
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
- result := <-listener
- Expect(result.err).ToNot(HaveOccurred())
-
- // from event
- Expect(len(result.deployments)).To(Equal(1))
- d := result.deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle1.Name))
- Expect(d.BundleURI).To(Equal(bundle1.URI))
-
- // from db
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d = deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle1.Name))
- Expect(d.BundleURI).To(Equal(bundle1.URI))
-
- close(done)
- })
-
It("should process unready on existing db startup event", func(done Done) {
+ fmt.Println("should process unready on existing db startup event")
deploymentID := "startup_test"
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-
- dep := DataDeployment{
- ID: deploymentID,
- DataScopeID: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- }
-
- // init without info == startup on existing DB
- var snapshot = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{},
- }
+ snapshot, dep := createSnapshotDeployment(deploymentID)
db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+ Expect(err).ShouldNot(HaveOccurred())
err = InitDB(db)
- if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
- }
-
- tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
+ insertDeploymentToDb(dep, db)
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -145,6 +50,7 @@
})
It("should send deployment statuses on existing db startup event", func(done Done) {
+ fmt.Println("should send deployment statuses on existing db startup event")
successDep := DataDeployment{
ID: "success",
@@ -203,14 +109,10 @@
}
db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+ Expect(err).NotTo(HaveOccurred())
err = InitDB(db)
- if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
- }
+ Expect(err).NotTo(HaveOccurred())
tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
@@ -231,37 +133,13 @@
Context("ApigeeSync change event", func() {
- XIt("add event should add a deployment", func(done Done) {
+ It("inserting event should deliver the deployment to subscribers", func(done Done) {
deploymentID := "add_test_1"
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
+ event, dep := createChangeDeployment(deploymentID)
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- var event = common.ChangeList{
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: DEPLOYMENT_TABLE,
- NewRow: row,
- },
- },
- }
+ insertDeploymentToDb(dep, getDB())
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -279,31 +157,31 @@
d := deployments[0]
Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle.Name))
- Expect(d.BundleURI).To(Equal(bundle.URI))
+ Expect(d.BundleName).To(Equal(dep.BundleName))
+ Expect(d.BundleURI).To(Equal(dep.BundleURI))
close(done)
})
- XIt("delete event should delete a deployment", func(done Done) {
+ It("delete event should deliver to subscribers", func(done Done) {
deploymentID := "delete_test_1"
- tx, err := getDB().Begin()
- Expect(err).ShouldNot(HaveOccurred())
- dep := DataDeployment{
- ID: deploymentID,
- LocalBundleURI: "whatever",
- }
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
+ // insert deployment
+ event, dep := createChangeDeployment(deploymentID)
+ insertDeploymentToDb(dep, getDB())
+ listener := make(chan deploymentsResult)
+ addSubscriber <- listener
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+ // wait for event to propagate
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
+ // delete deployment
+ deletDeploymentFromDb(dep, getDB())
row := common.Row{}
row["id"] = &common.ColumnVal{Value: deploymentID}
-
- var event = common.ChangeList{
+ event = common.ChangeList{
Changes: []common.Change{
{
Operation: common.Delete,
@@ -313,19 +191,93 @@
},
}
- var listener = make(chan deploymentsResult)
+ listener = make(chan deploymentsResult)
addSubscriber <- listener
-
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
- <-listener
-
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(0))
-
+ result = <-listener
+ Expect(len(result.deployments)).To(Equal(0))
close(done)
})
})
})
+
+func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) {
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ changeList := common.ChangeList{
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: DEPLOYMENT_TABLE,
+ NewRow: row,
+ },
+ },
+ }
+ dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow)
+ return changeList, dep
+}
+
+func insertDeploymentToDb(dep DataDeployment, db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ defer tx.Rollback()
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func deletDeploymentFromDb(dep DataDeployment, db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ defer tx.Rollback()
+ err = deleteDeployment(tx, dep.ID)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func createSnapshotDeployment(deploymentID string) (common.Snapshot, DataDeployment) {
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ }
+
+ // init without info == startup on existing DB
+ var snapshot = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{},
+ }
+ return snapshot, dep
+}