reformat
diff --git a/data.go b/data.go
index eca1491..3d33902 100644
--- a/data.go
+++ b/data.go
@@ -71,6 +71,17 @@
type dbManager struct {
db apid.DB
dbMux sync.RWMutex
+ data apid.DataService
+}
+
+func (dbc *dbManager) setDbVersion(version string) {
+ db, err := dbc.data.DBVersion(version)
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
+ dbc.dbMux.Lock()
+ dbc.db = db
+ dbc.dbMux.Unlock()
}
func (dbc *dbManager) getDb() apid.DB {
diff --git a/data_test.go b/data_test.go
index e1bf8dc..f6115d4 100644
--- a/data_test.go
+++ b/data_test.go
@@ -15,55 +15,67 @@
package apidApigeeSync
import (
+ "github.com/30x/apid-core/data"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
+ "io/ioutil"
+ "reflect"
"sort"
+ "strconv"
"sync"
)
-var _ = Describe("data access tests", func() {
- var testCount int
+var _ = Describe("DB manager tests", func() {
var testDbMan *dbManager
+ var testCount int
BeforeEach(func() {
- testCount += 1
testDbMan = &dbManager{
dbMux: sync.RWMutex{},
+ data: dataService,
}
+ testCount += 1
- db := testDbMan.getDb()
-
- //all tests in this file operate on the api_product table. Create the necessary tables for this here
- db.Exec("CREATE TABLE _transicator_tables " +
- "(tableName varchar not null, columnName varchar not null, " +
- "typid integer, primaryKey bool);")
- db.Exec("DELETE from _transicator_tables")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
- db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
-
- db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
- "api_resources text,approval_type text,scopes text,proxies text, environments text," +
- "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
- "primary key (id,tenant_id,created_at,updated_at));")
- db.Exec("DELETE from kms_api_product")
-
- setDB(db)
- initDB(db)
-
+ testDbMan.setDbVersion("data_test_" + strconv.Itoa(testCount))
})
- Context("Update processing", func() {
+ var _ = AfterEach(func() {
+ testDbMan = nil
+ data.Delete(data.VersionedDBID("common", "data_test_"+strconv.Itoa(testCount)))
+ })
+
+ Context("Basic Update/Insert/Delete processing", func() {
+ BeforeEach(func() {
+ db := testDbMan.getDb()
+
+ //all tests in this file operate on the api_product table. Create the necessary tables for this here
+ db.Exec("CREATE TABLE _transicator_tables " +
+ "(tableName varchar not null, columnName varchar not null, " +
+ "typid integer, primaryKey bool);")
+ db.Exec("DELETE from _transicator_tables")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','id',2950,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','tenant_id',1043,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','description',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','api_resources',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','approval_type',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','scopes',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','proxies',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','environments',1015,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_at',1114,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','created_by',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_at',1114,1)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','updated_by',1043,0)")
+ db.Exec("INSERT INTO _transicator_tables VALUES('kms_api_product','_change_selector',1043,0)")
+
+ db.Exec("CREATE TABLE kms_api_product (id text,tenant_id text,name text, description text, " +
+ "api_resources text,approval_type text,scopes text,proxies text, environments text," +
+ "created_at blob, created_by text,updated_at blob,updated_by text,_change_selector text, " +
+ "primary key (id,tenant_id,created_at,updated_at));")
+ db.Exec("DELETE from kms_api_product")
+
+ testDbMan.initDefaultDb()
+ })
+
It("unit test buildUpdateSql with single primary key", func() {
testRow := common.Row{
"id": {
@@ -193,9 +205,9 @@
},
}
//insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -210,13 +222,13 @@
}
//do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+ err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -288,9 +300,9 @@
},
}
//insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -305,13 +317,13 @@
}
//do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+ err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='new_id' and description='new description'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
})
@@ -390,9 +402,9 @@
},
}
//insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -407,13 +419,13 @@
}
//do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+ err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
})
@@ -489,9 +501,9 @@
},
}
//insert and assert success
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
+ err := testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='A product for testing Greg'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -506,19 +518,17 @@
}
//do the update
- Expect(true).To(Equal(processChangeList(event)))
- err = getDB().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
+ err = testDbMan.getDb().QueryRow("SELECT count(id) FROM kms_api_product WHERE id='87a4bfaa-b3c4-47cd-b6c5-378cdb68610c' and description='new description'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
})
- })
- Context("Insert processing", func() {
It("Properly constructs insert sql for one row", func() {
newRow := common.Row{
"id": {
@@ -659,16 +669,16 @@
},
}
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
@@ -743,22 +753,22 @@
},
}
- Expect(true).To(Equal(processChangeList(event)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event)))
var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(2))
})
@@ -801,12 +811,12 @@
},
}
- ok := processChangeList(event)
+ ok := testDbMan.writeTransaction(event)
Expect(false).To(Equal(ok))
var nRows int
//assert that no extraneous rows were added
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(0))
})
@@ -880,12 +890,10 @@
},
}
- ok := processChangeList(event)
+ ok := testDbMan.writeTransaction(event)
Expect(false).To(Equal(ok))
})
- })
- Context("Delete processing", func() {
It("Properly constructs sql prepare for Delete", func() {
row := common.Row{
"id": {
@@ -914,7 +922,7 @@
},
}
- pkeys, err := getPkeysForTable("kms_api_product")
+ pkeys, err := testDbMan.getPkeysForTable("kms_api_product")
Expect(err).Should(Succeed())
sql := buildDeleteSql("kms_api_product", row, pkeys)
Expect(sql).To(Equal("DELETE FROM kms_api_product WHERE created_at=$1 AND id=$2 AND tenant_id=$3 AND updated_at=$4"))
@@ -966,28 +974,28 @@
},
}
- Expect(true).To(Equal(processChangeList(event1)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
var nRows int
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
- Expect(true).To(Equal(processChangeList(event2)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event2)))
// validate delete
- err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(0))
// delete again should fail - coz entry will not exist
- Expect(false).To(Equal(processChangeList(event2)))
+ Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
})
It("verify multiple insert and single delete works", func() {
@@ -1068,43 +1076,43 @@
},
}
- Expect(true).To(Equal(processChangeList(event1)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
var nRows int
//verify first row
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//verify second row
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(2))
- Expect(true).To(Equal(processChangeList(event2)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event2)))
//verify second row still exists
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='b' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
// validate delete
- err = getDB().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("select count(*) from kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
// delete again should fail - coz entry will not exist
- Expect(false).To(Equal(processChangeList(event2)))
+ Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
}, 3)
It("verify single insert and multiple delete fails", func() {
@@ -1185,21 +1193,209 @@
},
}
- Expect(true).To(Equal(processChangeList(event1)))
+ Expect(true).To(Equal(testDbMan.writeTransaction(event1)))
var nRows int
//verify insert
- err := getDB().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
+ err := testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product WHERE id='a' and api_resources='r'" +
"and environments='{test}' and tenant_id='t' and description='d' and created_at='c' and updated_at='u'" +
"and _change_selector='cs'").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
//assert that no extraneous rows were added
- err = getDB().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
+ err = testDbMan.getDb().QueryRow("SELECT count(*) FROM kms_api_product").Scan(&nRows)
Expect(err).NotTo(HaveOccurred())
Expect(nRows).To(Equal(1))
- Expect(false).To(Equal(processChangeList(event2)))
+ Expect(false).To(Equal(testDbMan.writeTransaction(event2)))
+
+ }, 3)
+ })
+
+ Context("ApigeeSync methods", func() {
+ var createTestDb = func(sqlfile string) {
+
+ db := testDbMan.getDb()
+ sqlStatementsBuffer, err := ioutil.ReadFile(sqlfile)
+ Expect(err).Should(Succeed())
+ sqlStatementsString := string(sqlStatementsBuffer)
+ _, err = db.Exec(sqlStatementsString)
+ Expect(err).Should(Succeed())
+ }
+
+ It("test getClusterCount", func() {
+ createTestDb("./sql/init_listener_test_duplicate_apids.sql")
+ Expect(testDbMan.getClusterCount()).To(Equal(2))
+ })
+
+ It("should process a valid Snapshot", func() {
+
+ createTestDb("./sql/init_listener_test_valid_snapshot.sql")
+
+ dbVersion := "data_test_" + strconv.Itoa(testCount)
+ err := testDbMan.updateApidInstanceInfo()
+ Expect(err).Should(Succeed())
+
+ info, err := testDbMan.getApidInstanceInfo()
+ Expect(err).Should(Succeed())
+ Expect(info.LastSnapshot).To(Equal(dbVersion))
+
+ db := testDbMan.getDb()
+
+ // apid Cluster
+ var dcs []dataApidCluster
+ rows, err := db.Query(`
+ SELECT id, name, description, umbrella_org_app_name,
+ created, created_by, updated, updated_by
+ FROM EDGEX_APID_CLUSTER`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ c := dataApidCluster{}
+ for rows.Next() {
+ rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
+ &c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
+ dcs = append(dcs, c)
+ }
+
+ Expect(len(dcs)).To(Equal(1))
+ dc := dcs[0]
+
+ Expect(dc.ID).To(Equal("i"))
+ Expect(dc.Name).To(Equal("n"))
+ Expect(dc.Description).To(Equal("d"))
+ Expect(dc.OrgAppName).To(Equal("o"))
+ Expect(dc.Created).To(Equal("c"))
+ Expect(dc.CreatedBy).To(Equal("c"))
+ Expect(dc.Updated).To(Equal("u"))
+ Expect(dc.UpdatedBy).To(Equal("u"))
+
+ // Data Scope
+ var dds []dataDataScope
+
+ rows, err = db.Query(`
+ SELECT id, apid_cluster_id, scope, org,
+ env, created, created_by, updated,
+ updated_by
+ FROM EDGEX_DATA_SCOPE`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ d := dataDataScope{}
+ for rows.Next() {
+ rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
+ &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
+ &d.UpdatedBy)
+ dds = append(dds, d)
+ }
+
+ Expect(len(dds)).To(Equal(3))
+ ds := dds[0]
+
+ Expect(ds.ID).To(Equal("i"))
+ Expect(ds.Org).To(Equal("o"))
+ Expect(ds.Env).To(Equal("e1"))
+ Expect(ds.Scope).To(Equal("s1"))
+ Expect(ds.Created).To(Equal("c"))
+ Expect(ds.CreatedBy).To(Equal("c"))
+ Expect(ds.Updated).To(Equal("u"))
+ Expect(ds.UpdatedBy).To(Equal("u"))
+
+ ds = dds[1]
+ Expect(ds.Env).To(Equal("e2"))
+ Expect(ds.Scope).To(Equal("s1"))
+ ds = dds[2]
+ Expect(ds.Env).To(Equal("e3"))
+ Expect(ds.Scope).To(Equal("s2"))
+
+ scopes := testDbMan.findScopesForId("a")
+ Expect(len(scopes)).To(Equal(6))
+ expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
+ sort.Strings(scopes)
+ sort.Strings(expectedScopes)
+ Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
+ }, 3)
+
+ It("insert event should add", func() {
+ createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
+ event := common.ChangeList{
+ LastSequence: "test",
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "i"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s1"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
+ },
+ },
+ {
+ Operation: common.Insert,
+ Table: LISTENER_TABLE_DATA_SCOPE,
+ NewRow: common.Row{
+ "id": &common.ColumnVal{Value: "j"},
+ "apid_cluster_id": &common.ColumnVal{Value: "a"},
+ "scope": &common.ColumnVal{Value: "s2"},
+ "org": &common.ColumnVal{Value: "o"},
+ "env": &common.ColumnVal{Value: "e"},
+ "created": &common.ColumnVal{Value: "c"},
+ "created_by": &common.ColumnVal{Value: "c"},
+ "updated": &common.ColumnVal{Value: "u"},
+ "updated_by": &common.ColumnVal{Value: "u"},
+ "_change_selector": &common.ColumnVal{Value: "cs"},
+ },
+ },
+ },
+ }
+
+ testDbMan.writeTransaction(&event)
+
+ var dds []dataDataScope
+
+ rows, err := testDbMan.getDb().Query(`
+ SELECT id, apid_cluster_id, scope, org,
+ env, created, created_by, updated,
+ updated_by
+ FROM EDGEX_DATA_SCOPE`)
+ Expect(err).NotTo(HaveOccurred())
+ defer rows.Close()
+
+ d := dataDataScope{}
+ for rows.Next() {
+ rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
+ &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
+ &d.UpdatedBy)
+ dds = append(dds, d)
+ }
+
+ //three already existing
+ Expect(len(dds)).To(Equal(2))
+ ds := dds[0]
+
+ Expect(ds.ID).To(Equal("i"))
+ Expect(ds.Org).To(Equal("o"))
+ Expect(ds.Env).To(Equal("e"))
+ Expect(ds.Scope).To(Equal("s1"))
+ Expect(ds.Created).To(Equal("c"))
+ Expect(ds.CreatedBy).To(Equal("c"))
+ Expect(ds.Updated).To(Equal("u"))
+ Expect(ds.UpdatedBy).To(Equal("u"))
+
+ ds = dds[1]
+ Expect(ds.Scope).To(Equal("s2"))
+
+ scopes := testDbMan.findScopesForId("a")
+ Expect(len(scopes)).To(Equal(2))
+ Expect(scopes[0]).To(Equal("s1"))
+ Expect(scopes[1]).To(Equal("s2"))
}, 3)
})
diff --git a/init.go b/init.go
index d8226de..f456953 100644
--- a/init.go
+++ b/init.go
@@ -105,6 +105,7 @@
dbMan = &dbManager{
dbMux: sync.RWMutex{},
+ data: services.Data(),
}
// set up default database
diff --git a/listener.go b/listener.go
index 76ec295..cffeb05 100644
--- a/listener.go
+++ b/listener.go
@@ -15,7 +15,6 @@
package apidApigeeSync
import (
- "github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
@@ -31,26 +30,21 @@
func (lm *listenerManager) processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
- db, err := dataService.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
-
- lm.processSqliteSnapshot(db)
+ lm.dbm.setDbVersion(snapshot.SnapshotInfo)
+ lm.processSqliteSnapshot()
//update apid instance info
apidInfo.LastSnapshot = snapshot.SnapshotInfo
- err = lm.dbm.updateApidInstanceInfo()
+ err := lm.dbm.updateApidInstanceInfo()
if err != nil {
log.Panicf("Unable to update instance info: %v", err)
}
- lm.dbm.setDb(db)
log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
}
-func (lm *listenerManager) processSqliteSnapshot(db apid.DB) {
+func (lm *listenerManager) processSqliteSnapshot() {
if count, err := lm.dbm.getClusterCount(); err != nil || count != 1 {
log.Panicf("Illegal state for apid_cluster. Must be a single row. %v", err)
diff --git a/listener_test.go b/listener_test.go
index d48c70e..2950288 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -20,11 +20,19 @@
"github.com/apigee-labs/transicator/common"
"os"
- "reflect"
- "sort"
)
var _ = Describe("listener", func() {
+ var testListenerMan *listenerManager
+ var mockDbMan *dummyDbMan
+ var testCount int
+ BeforeEach(func() {
+ mockDbMan = &dummyDbMan{}
+ testListenerMan = &listenerManager{
+ dbm: mockDbMan,
+ }
+ testCount += 1
+ })
var createTestDb = func(sqlfile string, dbId string) common.Snapshot {
initDb(sqlfile, "./mockdb.sqlite3")
@@ -40,11 +48,11 @@
Context("ApigeeSync snapshot event", func() {
It("should fail if more than one apid_cluster rows", func() {
- event := createTestDb("./sql/init_listener_test_duplicate_apids.sql", "test_snapshot_fail_multiple_clusters")
- Expect(func() { processSnapshot(&event) }).To(Panic())
+ mockDbMan.clusterCount = 2
+ Expect(func() { testListenerMan.processSnapshot(&common.Snapshot{}) }).To(Panic())
}, 3)
- It("should fail if more than one apid_cluster rows", func() {
+ It("test scope change", func() {
newScopes := []string{"foo"}
scopes := []string{"bar"}
Expect(scopeChanged(newScopes, scopes)).To(Equal(changeServerError{Code: "Scope changes detected; must get new snapshot"}))
@@ -59,99 +67,6 @@
Expect(scopeChanged(newScopes, scopes)).To(BeNil())
}, 3)
-
- It("should process a valid Snapshot", func() {
-
- event := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_snapshot_valid")
-
- processSnapshot(&event)
-
- info, err := getApidInstanceInfo()
- Expect(err).NotTo(HaveOccurred())
-
- Expect(info.LastSnapshot).To(Equal(event.SnapshotInfo))
-
- db := getDB()
-
- expectedDB, err := dataService.DBVersion(event.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
-
- Expect(db == expectedDB).Should(BeTrue())
-
- // apid Cluster
- var dcs []dataApidCluster
-
- rows, err := db.Query(`
- SELECT id, name, description, umbrella_org_app_name,
- created, created_by, updated, updated_by
- FROM EDGEX_APID_CLUSTER`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- c := dataApidCluster{}
- for rows.Next() {
- rows.Scan(&c.ID, &c.Name, &c.Description, &c.OrgAppName,
- &c.Created, &c.CreatedBy, &c.Updated, &c.UpdatedBy)
- dcs = append(dcs, c)
- }
-
- Expect(len(dcs)).To(Equal(1))
- dc := dcs[0]
-
- Expect(dc.ID).To(Equal("i"))
- Expect(dc.Name).To(Equal("n"))
- Expect(dc.Description).To(Equal("d"))
- Expect(dc.OrgAppName).To(Equal("o"))
- Expect(dc.Created).To(Equal("c"))
- Expect(dc.CreatedBy).To(Equal("c"))
- Expect(dc.Updated).To(Equal("u"))
- Expect(dc.UpdatedBy).To(Equal("u"))
-
- // Data Scope
- var dds []dataDataScope
-
- rows, err = db.Query(`
- SELECT id, apid_cluster_id, scope, org,
- env, created, created_by, updated,
- updated_by
- FROM EDGEX_DATA_SCOPE`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- d := dataDataScope{}
- for rows.Next() {
- rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
- &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
- &d.UpdatedBy)
- dds = append(dds, d)
- }
-
- Expect(len(dds)).To(Equal(3))
- ds := dds[0]
-
- Expect(ds.ID).To(Equal("i"))
- Expect(ds.Org).To(Equal("o"))
- Expect(ds.Env).To(Equal("e1"))
- Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
- Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
- Expect(ds.UpdatedBy).To(Equal("u"))
-
- ds = dds[1]
- Expect(ds.Env).To(Equal("e2"))
- Expect(ds.Scope).To(Equal("s1"))
- ds = dds[2]
- Expect(ds.Env).To(Equal("e3"))
- Expect(ds.Scope).To(Equal("s2"))
-
- scopes := findScopesForId("a")
- Expect(len(scopes)).To(Equal(6))
- expectedScopes := []string{"s1", "s2", "org_scope_1", "env_scope_1", "env_scope_2", "env_scope_3"}
- sort.Strings(scopes)
- sort.Strings(expectedScopes)
- Expect(reflect.DeepEqual(scopes, expectedScopes)).To(BeTrue())
- }, 3)
})
Context("ApigeeSync change event", func() {
@@ -159,11 +74,6 @@
Context(LISTENER_TABLE_APID_CLUSTER, func() {
It("insert event should panic", func() {
- ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_insert_panic")
- processSnapshot(&ssEvent)
-
- //save the last snapshot, so we can restore it at the end of this context
-
csEvent := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -174,13 +84,10 @@
},
}
- Expect(func() { processChangeList(&csEvent) }).To(Panic())
+ Expect(func() { testListenerMan.processChangeList(&csEvent) }).To(Panic())
}, 3)
It("update event should panic", func() {
- ssEvent := createTestDb("./sql/init_listener_test_valid_snapshot.sql", "test_changes_update_panic")
- processSnapshot(&ssEvent)
-
event := common.ChangeList{
LastSequence: "test",
Changes: []common.Change{
@@ -191,7 +98,7 @@
},
}
- Expect(func() { processChangeList(&event) }).To(Panic())
+ Expect(func() { testListenerMan.processChangeList(&event) }).To(Panic())
//restore the last snapshot
}, 3)
@@ -199,91 +106,6 @@
Context(LISTENER_TABLE_DATA_SCOPE, func() {
- It("insert event should add", func() {
- ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_insert")
- processSnapshot(&ssEvent)
-
- event := common.ChangeList{
- LastSequence: "test",
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "i"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s1"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "_change_selector": &common.ColumnVal{Value: "cs"},
- },
- },
- {
- Operation: common.Insert,
- Table: LISTENER_TABLE_DATA_SCOPE,
- NewRow: common.Row{
- "id": &common.ColumnVal{Value: "j"},
- "apid_cluster_id": &common.ColumnVal{Value: "a"},
- "scope": &common.ColumnVal{Value: "s2"},
- "org": &common.ColumnVal{Value: "o"},
- "env": &common.ColumnVal{Value: "e"},
- "created": &common.ColumnVal{Value: "c"},
- "created_by": &common.ColumnVal{Value: "c"},
- "updated": &common.ColumnVal{Value: "u"},
- "updated_by": &common.ColumnVal{Value: "u"},
- "_change_selector": &common.ColumnVal{Value: "cs"},
- },
- },
- },
- }
-
- processChangeList(&event)
-
- var dds []dataDataScope
-
- rows, err := getDB().Query(`
- SELECT id, apid_cluster_id, scope, org,
- env, created, created_by, updated,
- updated_by
- FROM EDGEX_DATA_SCOPE`)
- Expect(err).NotTo(HaveOccurred())
- defer rows.Close()
-
- d := dataDataScope{}
- for rows.Next() {
- rows.Scan(&d.ID, &d.ClusterID, &d.Scope, &d.Org,
- &d.Env, &d.Created, &d.CreatedBy, &d.Updated,
- &d.UpdatedBy)
- dds = append(dds, d)
- }
-
- //three already existing
- Expect(len(dds)).To(Equal(2))
- ds := dds[0]
-
- Expect(ds.ID).To(Equal("i"))
- Expect(ds.Org).To(Equal("o"))
- Expect(ds.Env).To(Equal("e"))
- Expect(ds.Scope).To(Equal("s1"))
- Expect(ds.Created).To(Equal("c"))
- Expect(ds.CreatedBy).To(Equal("c"))
- Expect(ds.Updated).To(Equal("u"))
- Expect(ds.UpdatedBy).To(Equal("u"))
-
- ds = dds[1]
- Expect(ds.Scope).To(Equal("s2"))
-
- scopes := findScopesForId("a")
- Expect(len(scopes)).To(Equal(2))
- Expect(scopes[0]).To(Equal("s1"))
- Expect(scopes[1]).To(Equal("s2"))
-
- }, 3)
-
It("delete event should delete", func() {
ssEvent := createTestDb("./sql/init_listener_test_no_datascopes.sql", "test_changes_delete")
processSnapshot(&ssEvent)
diff --git a/managerInterfaces.go b/managerInterfaces.go
index e406784..ff29c2d 100644
--- a/managerInterfaces.go
+++ b/managerInterfaces.go
@@ -61,10 +61,11 @@
getClusterCount() (numApidClusters int, err error)
alterClusterTable() (err error)
writeTransaction(*common.ChangeList) bool
+ setDbVersion(version string)
}
type listenerManagerInterface interface {
processSnapshot(snapshot *common.Snapshot)
- processSqliteSnapshot(db apid.DB)
+ processSqliteSnapshot()
processChangeList(changes *common.ChangeList) bool
}
diff --git a/token_test.go b/token_test.go
index e75b54d..cb38125 100644
--- a/token_test.go
+++ b/token_test.go
@@ -216,10 +216,13 @@
})
})
-type dummyDbMan struct{}
+type dummyDbMan struct {
+ clusterCount int
+ err error
+}
func (d *dummyDbMan) initDefaultDb() error {
- return nil
+ return d.err
}
func (d *dummyDbMan) setDb(apid.DB) {}
@@ -241,7 +244,7 @@
}
func (d *dummyDbMan) getPkeysForTable(tableName string) ([]string, error) {
- return nil, nil
+ return nil, d.err
}
func (d *dummyDbMan) findScopesForId(configId string) []string {
@@ -249,15 +252,15 @@
}
func (d *dummyDbMan) getDefaultDb() (apid.DB, error) {
- return nil, nil
+ return nil, d.err
}
func (d *dummyDbMan) updateApidInstanceInfo() error {
- return nil
+ return d.err
}
-func (d *dummyDbMan) getApidInstanceInfo() (info apidInstanceInfo, err error) {
- return
+func (d *dummyDbMan) getApidInstanceInfo() (apidInstanceInfo, error) {
+ return apidInstanceInfo{}, d.err
}
func (d *dummyDbMan) getLastSequence() string {
@@ -265,13 +268,20 @@
}
func (d *dummyDbMan) updateLastSequence(lastSequence string) error {
- return nil
+ return d.err
}
func (d *dummyDbMan) getClusterCount() (int, error) {
- return 1, nil
+ return d.clusterCount, d.err
}
func (d *dummyDbMan) alterClusterTable() error {
- return nil
+ return d.err
+}
+func (d *dummyDbMan) writeTransaction(_ *common.ChangeList) bool {
+ return true
+}
+
+func (d *dummyDbMan) setDbVersion(version string) {
+
}