[65557251] Disable Shared Cache flag. (#19)
* [65557251] Disable Shared Cache flag.
* add tests for currently alter table and read table (#17)
* add tests for currently alter table and read table
* Make concurrent tests runnning on the same table. Add test of alter/read different tables concurrently.
* Make concurrent tests runnning on the same table. Add test of alter/read different tables concurrently.
diff --git a/data/data.go b/data/data.go
index 8eded28..bc14738 100644
--- a/data/data.go
+++ b/data/data.go
@@ -39,7 +39,6 @@
statCollectionInterval = 10
commonDBID = "common"
commonDBVersion = "base"
- dbOpenMode = "?cache=shared&mode=rwc"
defaultTraceLevel = "warn"
)
@@ -64,7 +63,7 @@
}
func (d *ApidDb) SetMaxIdleConns(n int) {
- d.db.SetMaxIdleConns(n)
+ d.db.SetMaxIdleConns(n)
}
func (d *ApidDb) SetMaxOpenConns(n int) {
@@ -261,7 +260,6 @@
log.Infof("LoadDB: %s", dataPath)
source := fmt.Sprintf(config.GetString(configDataSourceKey), dataPath)
- source += dbOpenMode
wrappedDriverName := "dd:" + config.GetString(configDataDriverKey)
driver := wrap.NewDriver(&sqlite3.SQLiteDriver{}, dbTraceLog)
func() {
diff --git a/data/data_test.go b/data/data_test.go
index f17511b..c37c9b0 100644
--- a/data/data_test.go
+++ b/data/data_test.go
@@ -119,10 +119,10 @@
finished := make(chan bool, count)
for i := 0; i < count; i++ {
- go func() {
- write(db, i)
+ go func(j int) {
+ write(db, j)
finished <- true
- }()
+ }(i)
}
for i := 0; i < count; i++ {
@@ -131,6 +131,119 @@
Expect(db.Stats().OpenConnections).To(Equal(1))
}
}, 10)
+
+ It("should handle concurrent read & write", func() {
+ db, err := apid.Data().DBForID("test_read_write")
+ db.SetMaxOpenConns(10)
+ db.SetMaxIdleConns(10)
+ Expect(err).NotTo(HaveOccurred())
+ setup(db)
+ finished := make(chan bool, 2*count)
+
+ for i := 0; i < count; i++ {
+ go func(j int) {
+ write(db, j)
+ finished <- true
+ }(i)
+ go func() {
+ read(db)
+ finished <- true
+ }()
+ }
+
+ for i := 0; i < 2*count; i++ {
+ <-finished
+ }
+ }, 10)
+
+ It("should handle concurrent alter table by seralizing them", func() {
+ db, err := apid.Data().DBForID("test_alter")
+ db.SetMaxOpenConns(10)
+ db.SetMaxIdleConns(10)
+ Expect(err).NotTo(HaveOccurred())
+ setup(db)
+ alterCount := 50
+ finished := make(chan bool, alterCount)
+
+ for i := 0; i < alterCount; i++ {
+ go func(j int) {
+ alter(db, j, "test_1")
+ finished <- true
+ }(i)
+ }
+
+ for i := 0; i < alterCount; i++ {
+ <-finished
+ Expect(db.Stats().OpenConnections).To(Equal(1))
+ }
+ }, 10)
+
+ It("should handle seralized read & alter table", func() {
+ db, err := apid.Data().DBForID("test_read_alter")
+ db.SetMaxOpenConns(10)
+ db.SetMaxIdleConns(10)
+ Expect(err).NotTo(HaveOccurred())
+ setup(db)
+ alterCount := 50
+ for i := 0; i < alterCount; i++ {
+ alter(db, i, "test_1")
+ read(db)
+ }
+ }, 10)
+
+ It("concurrent read & alter the same table ", func() {
+ db, err := apid.Data().DBForID("test_conn_read_alter")
+ db.SetMaxOpenConns(10)
+ db.SetMaxIdleConns(10)
+ Expect(err).NotTo(HaveOccurred())
+ setup(db)
+ alterCount := 50
+ finished := make(chan bool, count+alterCount)
+ for i := 0; i < alterCount; i++ {
+ go func(j int) {
+ alter(db, j, "test_1")
+ finished <- true
+ }(i)
+ }
+
+ for i := 0; i < count; i++ {
+ go func() {
+ read(db)
+ finished <- true
+ }()
+ }
+
+ for i := 0; i < count+alterCount; i++ {
+ <-finished
+ }
+ }, 10)
+
+ It("concurrent read & alter different tables", func() {
+ db, err := apid.Data().DBForID("test_conn_read_alter_diff_table")
+ db.SetMaxOpenConns(10)
+ db.SetMaxIdleConns(10)
+ Expect(err).NotTo(HaveOccurred())
+ setup(db)
+ alterCount := 50
+ finished := make(chan bool, count+alterCount)
+ for i := 0; i < alterCount; i++ {
+ go func(j int) {
+ alter(db, j, "test_2")
+ finished <- true
+ }(i)
+ }
+
+ for i := 0; i < count; i++ {
+ go func() {
+ read(db)
+ finished <- true
+ }()
+ }
+
+ for i := 0; i < count+alterCount; i++ {
+ <-finished
+ }
+ }, 10)
})
func setup(db apid.DB) {
@@ -148,7 +261,7 @@
func read(db apid.DB) {
defer GinkgoRecover()
var counter string
- rows, err := db.Query(`SELECT counter FROM test_2 LIMIT 5`)
+ rows, err := db.Query(`SELECT counter FROM test_1 LIMIT 5`)
Expect(err).Should(Succeed())
defer rows.Close()
for rows.Next() {
@@ -174,3 +287,18 @@
//Expect(err).Should(Succeed())
fmt.Print("+")
}
+
+func alter(db apid.DB, i int, tableName string) {
+ defer GinkgoRecover()
+ // DB INSERT as a txn
+ tx, err := db.Begin()
+ Expect(err).Should(Succeed())
+ defer tx.Rollback()
+ prep, err := tx.Prepare("ALTER TABLE " + tableName + " ADD COLUMN colname_" + strconv.Itoa(i) + " text DEFAULT ''")
+ Expect(err).Should(Succeed())
+ _, err = prep.Exec()
+ Expect(err).Should(Succeed())
+ Expect(prep.Close()).Should(Succeed())
+ Expect(tx.Commit()).Should(Succeed())
+ fmt.Print("-")
+}