Added DB versioning and release (cleanup) capabilities
diff --git a/cmd/apid/glide.lock b/cmd/apid/glide.lock index 801b183..a6ebe36 100644 --- a/cmd/apid/glide.lock +++ b/cmd/apid/glide.lock
@@ -1,5 +1,5 @@ -hash: 526c791f6bc598c820d13f2fb7bf02764a7086db48b187b7365a21441e0cbf03 -updated: 2016-11-11T17:26:14.669214342-08:00 +hash: f3dc2db21f613a96b9813cfa96b493a1af143875f4cf8ea52c099bf1e123320c +updated: 2016-11-23T17:50:45.156237106-08:00 imports: - name: github.com/30x/apid version: a6a1489821eae43312dd88a733c85a604f2e6777 @@ -17,9 +17,9 @@ subpackages: - github - name: github.com/30x/apidVerifyAPIKey - version: 594d71754f849cfd862f2ba1656ce7b80c5b724f + version: 6218cb0843f68ea7cb6d7a7144e5e4426b7367ba - name: github.com/apigee-labs/transicator - version: 290b26aeac93afaeea5c75b96acb8a4b07a7e6e9 + version: 1b579d18d82956ff1f30c02f72d2812bcca093f8 subpackages: - common - name: github.com/fsnotify/fsnotify
diff --git a/cmd/apid/glide.yaml b/cmd/apid/glide.yaml index 04c913b..38b6b8a 100644 --- a/cmd/apid/glide.yaml +++ b/cmd/apid/glide.yaml
@@ -5,6 +5,4 @@ - package: github.com/apigee-labs/transicator - package: github.com/30x/apidApigeeSync - package: github.com/30x/apidGatewayDeploy - subpackages: - - github - package: github.com/30x/apidVerifyAPIKey
diff --git a/data/data.go b/data/data.go index f6a03b5..a886daa 100644 --- a/data/data.go +++ b/data/data.go
@@ -4,11 +4,12 @@ "database/sql" "fmt" "github.com/30x/apid" + "github.com/30x/apid/data/wrap" "github.com/mattn/go-sqlite3" + "os" "path" "sync" - "os" - "github.com/30x/apid/data/wrap" + "runtime" ) const ( @@ -16,7 +17,8 @@ configDataSourceKey = "data_source" configDataPathKey = "data_path" - commonDBID = "_apid_common_" + commonDBID = "common" + commonDBVersion = "base" defaultTraceLevel = "warn" ) @@ -24,7 +26,7 @@ var log, dbTraceLog apid.LogService var config apid.ConfigService -var dbMap = make(map[string]apid.DB) +var dbMap = make(map[string]*sql.DB) var dbMapSync sync.RWMutex func CreateDataService() apid.DataService { @@ -46,13 +48,57 @@ } func (d *dataService) DB() (apid.DB, error) { - return d.DBForID(commonDBID) + return d.dbVersionForID(commonDBID, commonDBVersion) } -func (d *dataService) DBForID(id string) (db apid.DB, err error) { +func (d *dataService) DBForID(id string) (apid.DB, error) { + if id == commonDBID { + return nil, fmt.Errorf("reserved ID: %s", id) + } + return d.dbVersionForID(id, commonDBVersion) +} + +func (d *dataService) DBVersion(version string) (apid.DB, error) { + if version == commonDBVersion { + return nil, fmt.Errorf("reserved version: %s", version) + } + return d.dbVersionForID(commonDBID, version) +} + +func (d *dataService) DBVersionForID(id, version string) (apid.DB, error) { + if id == commonDBID { + return nil, fmt.Errorf("reserved ID: %s", id) + } + if version == commonDBVersion { + return nil, fmt.Errorf("reserved version: %s", version) + } + return d.dbVersionForID(id, version) +} + +// will set DB to close and delete when no more references +func (d *dataService) ReleaseDB(id, version string) { + versionedID := VersionedDBID(id, version) + + dbMapSync.Lock() + defer dbMapSync.Unlock() + + db := dbMap[versionedID] + if db != nil { + dbMap[versionedID] = nil + log.Errorf("SETTING FINALIZER") + finalizer := Delete(versionedID) + runtime.SetFinalizer(db, finalizer) + } + + return +} + +func (d *dataService) dbVersionForID(id, version string) (db *sql.DB, err error) { + + versionedID := VersionedDBID(id, version) dbMapSync.RLock() - db = dbMap[id] + db = dbMap[versionedID] dbMapSync.RUnlock() if db != nil { return @@ -61,26 +107,29 @@ dbMapSync.Lock() defer dbMapSync.Unlock() - db = dbMap[id] + db = dbMap[versionedID] if db != nil { return } - storagePath := config.GetString("local_storage_path") - relativeDataPath := config.GetString(configDataPathKey) - dataPath := path.Join(storagePath, relativeDataPath) + dataPath := DBPath(versionedID) - if err = os.MkdirAll(dataPath, 0700); err != nil { + if err = os.MkdirAll(path.Dir(dataPath), 0700); err != nil { return } - dataFile := path.Join(dataPath, id) - log.Infof("LoadDB: %s", dataFile) - dataSource := fmt.Sprintf(config.GetString(configDataSourceKey), dataFile) + log.Infof("LoadDB: %s", dataPath) + dataSource := fmt.Sprintf(config.GetString(configDataSourceKey), dataPath) wrappedDriverName := "dd:" + config.GetString(configDataDriverKey) dataDriver := wrap.WrapDriver{&sqlite3.SQLiteDriver{}, dbTraceLog} - sql.Register(wrappedDriverName, &dataDriver) + func() { + // just ignore the "registered twice" panic + defer func() { + recover() + }() + sql.Register(wrappedDriverName, &dataDriver) + }() db, err = sql.Open(wrappedDriverName, dataSource) if err != nil { @@ -88,7 +137,6 @@ return } - log.Infof("Sqlite DB path used by apid: %s", dataPath) err = db.Ping() if err != nil { log.Errorf("error pinging db: %s", err) @@ -109,6 +157,31 @@ return } - dbMap[id] = db + dbMap[versionedID] = db return } + +func Delete(versionedID string) interface{} { + return func(db *sql.DB) { + err := db.Close() + if err != nil { + log.Errorf("error closing DB: %v", err) + } + dataDir := path.Dir(DBPath(versionedID)) + err = os.RemoveAll(dataDir) + if err != nil { + log.Errorf("error removing DB files: %v", err) + } + delete(dbMap, versionedID) + } +} + +func VersionedDBID(id, version string) string { + return path.Join(id, version) +} + +func DBPath(id string) string { + storagePath := config.GetString("local_storage_path") + relativeDataPath := config.GetString(configDataPathKey) + return path.Join(storagePath, relativeDataPath, id, "sqlite3") +} \ No newline at end of file
diff --git a/data/data_test.go b/data/data_test.go index 5581c5c..8d5e475 100644 --- a/data/data_test.go +++ b/data/data_test.go
@@ -1,6 +1,7 @@ package data_test import ( + "fmt" "github.com/30x/apid" "github.com/30x/apid/factory" . "github.com/onsi/ginkgo" @@ -11,17 +12,15 @@ "os" "strconv" "time" - "fmt" + "github.com/30x/apid/data" + "database/sql" ) const ( count = 2000 setupSql = ` - CREATE TABLE IF NOT EXISTS test_1 (id INTEGER PRIMARY KEY, counter TEXT); - CREATE TABLE IF NOT EXISTS test_2 (id INTEGER PRIMARY KEY, counter TEXT); - DELETE FROM test_1; - DELETE FROM test_2; - ` + CREATE TABLE test_1 (id INTEGER PRIMARY KEY, counter TEXT); + CREATE TABLE test_2 (id INTEGER PRIMARY KEY, counter TEXT);` ) var ( @@ -29,7 +28,7 @@ r *rand.Rand = rand.New(rand.NewSource(time.Now().UnixNano())) ) -var _ = Describe("Events Service", func() { +var _ = Describe("Data Service", func() { BeforeSuite(func() { apid.Initialize(factory.DefaultServicesFactory()) @@ -45,47 +44,71 @@ os.RemoveAll(tmpDir) }) - It("should be able to open a new datbase", func () { - db, err := apid.Data().DBForID("test") + It("should not allow reserved id or version", func() { + _, err := apid.Data().DBForID("common") + Expect(err).To(HaveOccurred()) + + _, err = apid.Data().DBVersion("base") + Expect(err).To(HaveOccurred()) + + _, err = apid.Data().DBVersionForID("common", "base") + Expect(err).To(HaveOccurred()) + }) + + It("should be able to change versions of a datbase", func() { + var versions []string + var dbs []apid.DB + + for i := 0; i < 2; i++ { + version := time.Now().String() + db, err := apid.Data().DBVersionForID("test", version) + Expect(err).NotTo(HaveOccurred()) + setup(db) + versions = append(versions, version) + dbs = append(dbs, db) + } + + for _, db := range dbs { + var numRows int + err := db.QueryRow(`SELECT count(*) FROM test_2`).Scan(&numRows) + Expect(err).NotTo(HaveOccurred()) + Expect(numRows).To(Equal(count)) + } + }) + + It("should be able to release a database", func() { + db, err := apid.Data().DBVersionForID("release", "version") Expect(err).NotTo(HaveOccurred()) setup(db) - - var prod string - rows, err := db.Query(`SELECT counter FROM test_2 LIMIT 5`) - Expect(err).NotTo(HaveOccurred()) - defer rows.Close() - var count = 0 - for rows.Next() { - count++ - rows.Scan(&prod) - } - Expect(count).To(Equal(5)) - - //db, err := apid.Data().DBForID("test", "someid") + id := data.VersionedDBID("release", "version") + sqlDB := db.(*sql.DB) + Expect(sqlDB.Stats().OpenConnections).To(Equal(1)) + // run finalizer + data.Delete(id).(func(db *sql.DB))(sqlDB) + Expect(sqlDB.Stats().OpenConnections).To(Equal(0)) + Expect(data.DBPath(id)).ShouldNot(BeAnExistingFile()) }) It("should handle multi-threaded access", func(done Done) { db, err := apid.Data().DBForID("test") Expect(err).NotTo(HaveOccurred()) setup(db) - finished := make(chan struct{}) go func() { for i := 0; i < count; i++ { write(db, i) - randomSleep() } finished <- struct{}{} }() go func() { for i := 0; i < count; i++ { - go func(i int) { - read(db, i) + go func() { + read(db) finished <- struct{}{} - }(i) - randomSleep() + }() + time.Sleep(time.Duration(r.Intn(2)) * time.Millisecond) } }() @@ -94,13 +117,9 @@ } close(done) - }, 4) + }, 10) }) -func randomSleep() { - time.Sleep(time.Duration(r.Intn(1)) * time.Millisecond) -} - func setup(db apid.DB) { _, err := db.Exec(setupSql) if err != nil { @@ -119,16 +138,16 @@ tx.Commit() } -func read(db apid.DB, i int) { - var prod string +func read(db apid.DB) { + var counter string rows, err := db.Query(`SELECT counter FROM test_2 LIMIT 5`) if err != nil { log.Fatalf("test_2 select failed. Exec error=%s", err) } else { defer rows.Close() for rows.Next() { - rows.Scan(&prod) - fmt.Print("*") + rows.Scan(&counter) + //fmt.Print("*") } } fmt.Print(".")
diff --git a/data_service.go b/data_service.go index 6cc6ea6..62523e0 100644 --- a/data_service.go +++ b/data_service.go
@@ -7,6 +7,12 @@ type DataService interface { DB() (DB, error) DBForID(id string) (db DB, err error) + + DBVersion(version string) (db DB, err error) + DBVersionForID(id, version string) (db DB, err error) + + // will set DB to close and delete when no more references + ReleaseDB(id, version string) } type DB interface {
diff --git a/glide.lock b/glide.lock index 9cdb0b2..e4d0273 100644 --- a/glide.lock +++ b/glide.lock
@@ -1,8 +1,22 @@ hash: 40de1172d0d2b2c7b5d02a923c2ccdf8474904fe03926bcc6eabc1e4369598c4 -updated: 2016-10-24T14:37:50.103199133-07:00 +updated: 2016-11-23T17:24:52.395084588-08:00 imports: +- name: github.com/30x/apidApigeeSync + version: ddc7021b4fa0e8b7147fc51af253438163d02d46 +- name: github.com/30x/apidGatewayDeploy + version: 75a87de3880bdf18b3fdc7aea2388a7e0822c70a +- name: github.com/30x/apidVerifyAPIKey + version: 6218cb0843f68ea7cb6d7a7144e5e4426b7367ba +- name: github.com/apigee-labs/transicator + version: 1b579d18d82956ff1f30c02f72d2812bcca093f8 + subpackages: + - common - name: github.com/fsnotify/fsnotify version: bd2828f9f176e52d7222e565abb2d338d3f3c103 +- name: github.com/golang/protobuf + version: df1d3ca07d2d07bba352d5b73c4313b4e2a6203e + subpackages: + - proto - name: github.com/gorilla/context version: 08b5f424b9271eedf6f9f0ce86cb9396ed337a42 - name: github.com/gorilla/mux @@ -87,7 +101,7 @@ - reporters/stenographer - types - name: github.com/onsi/gomega - version: a78ae492d53aad5a7a232d0d0462c14c400e3ee7 + version: d59fa0ac68bb5dd932ee8d24eed631cdd519efc3 subpackages: - format - internal/assertion