snapshot db protocol, cleanup
diff --git a/api.go b/api.go index 40acb90..9ce3bea 100644 --- a/api.go +++ b/api.go
@@ -28,7 +28,7 @@ Reason string `json:"reason"` } -func initAPI(services apid.Services) { +func initAPI() { services.API().HandleFunc("/deployments/current", handleCurrentDeployment).Methods("GET") services.API().HandleFunc("/deployments/{deploymentID}", handleDeploymentResult).Methods("POST") }
diff --git a/api_test.go b/api_test.go index be371be..1076ff8 100644 --- a/api_test.go +++ b/api_test.go
@@ -19,6 +19,7 @@ It("should get 404 if no deployments", func() { + db := getDB() _, err := db.Exec("DELETE FROM gateway_deploy_deployment") Expect(err).ShouldNot(HaveOccurred()) @@ -77,6 +78,7 @@ It("should get 404 after blocking if no deployment", func() { + db := getDB() _, err := db.Exec("DELETE FROM gateway_deploy_deployment") Expect(err).ShouldNot(HaveOccurred()) @@ -184,6 +186,7 @@ It("should mark a deployment as deployed", func() { + db := getDB() deploymentID := "api_mark_deployed" insertTestDeployment(testServer, deploymentID) @@ -221,6 +224,7 @@ It("should mark a deployment as failed", func() { + db := getDB() deploymentID := "api_test_3" insertTestDeployment(testServer, deploymentID) @@ -264,6 +268,7 @@ func insertTestDeployment(server *httptest.Server, depID string) { + db := getDB() uri, err := url.Parse(server.URL) Expect(err).ShouldNot(HaveOccurred()) uri.Path = "/bundle" @@ -284,7 +289,7 @@ }, } - err = insertDeployment(depID, dep) + err = insertDeployment(db, depID, dep) Expect(err).ShouldNot(HaveOccurred()) err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index c8be184..7287e3e 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -30,11 +30,15 @@ config.Set("local_storage_path", tmpDir) - // init() will create the tables apid.InitializePlugins() + db, err := data.DB() + Expect(err).NotTo(HaveOccurred()) + initDB(db) + setDB(db) + router := apid.API().Router() - // fake unreliable bundle repo + // fake an unreliable bundle repo downloadMultiplier = 10 * time.Millisecond count := 0 router.HandleFunc("/bundle/{id}", func(w http.ResponseWriter, req *http.Request) {
diff --git a/data.go b/data.go index f1da64f..45d2cc1 100644 --- a/data.go +++ b/data.go
@@ -3,6 +3,8 @@ import ( "database/sql" "time" + "github.com/30x/apid" + "sync" ) const ( @@ -16,59 +18,57 @@ BUNDLE_TYPE_DEP = 2 ) +var ( + unsafeDB apid.DB + dbMux sync.RWMutex +) + type SQLExec interface { Exec(query string, args ...interface{}) (sql.Result, error) } -func initDB() { +func initDB(db apid.DB) { - var count int - row := db.QueryRow("SELECT count(*) FROM sqlite_master WHERE type='table' AND name='gateway_deploy_deployment';") - if err := row.Scan(&count); err != nil { - log.Panicf("Unable to check for tables: %v", err) - } - if count > 0 { - return - } + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS gateway_deploy_deployment ( + id varchar(255), status integer, created_at integer, + modified_at integer, error_code varchar(255), + PRIMARY KEY (id)); - log.Debug("Creating database tables...") - - tx, err := db.Begin() + CREATE TABLE IF NOT EXISTS gateway_deploy_bundle ( + deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer, + created_at integer, modified_at integer, status integer, error_code integer, error_reason text, + PRIMARY KEY (deployment_id, id), + FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE); + `) if err != nil { - log.Panicf("Unable to start transaction: %v", err) - } - defer tx.Rollback() - - _, err = tx.Exec("CREATE TABLE gateway_deploy_deployment (" + - "id varchar(255), status integer, created_at integer, " + - "modified_at integer, error_code varchar(255), " + - "PRIMARY KEY (id));") - if err != nil { - log.Panicf("Unable to initialize gateway_deploy_deployment: %v", err) + log.Panicf("Unable to initialize database: %v", err) } - _, err = tx.Exec("CREATE TABLE gateway_deploy_bundle (" + - "deployment_id varchar(255), id varchar(255), scope varchar(255), uri varchar(255), type integer, " + - "created_at integer, modified_at integer, status integer, error_code integer, error_reason text, " + - "PRIMARY KEY (deployment_id, id), " + - "FOREIGN KEY (deployment_id) references gateway_deploy_deployment(id) ON DELETE CASCADE);") - if err != nil { - log.Panicf("Unable to initialize gateway_deploy_bundle: %v", err) - } - - err = tx.Commit() - if err != nil { - log.Panicf("Unable to commit transaction: %v", err) - } else { - log.Debug("Database tables created.") - } + log.Debug("Database tables created.") } func dbTimeNow() int64 { return int64(time.Now().UnixNano()) } -func insertDeployment(depID string, dep deployment) error { +func getDB() apid.DB { + dbMux.RLock() + db := unsafeDB + dbMux.RUnlock() + return db +} + +func setDB(db apid.DB) { + dbMux.Lock() + if unsafeDB == nil { // init API when DB is initialized + go initAPI() + } + unsafeDB = db + dbMux.Unlock() +} + +func insertDeployment(db apid.DB, depID string, dep deployment) error { log.Debugf("insertDeployment: %s", depID) @@ -127,6 +127,8 @@ log.Debugf("updateDeploymentAndBundles: %s", depID) + db := getDB() + /* * If the state of deployment was success, update state of bundles and * its deployments as success as well @@ -232,6 +234,7 @@ // getCurrentDeploymentID returns the ID of what should be the "current" deployment func getCurrentDeploymentID() (string, error) { + db := getDB() var depID string err := db.QueryRow("SELECT id FROM gateway_deploy_deployment " + "WHERE status >= ? ORDER BY created_at DESC LIMIT 1;", DEPLOYMENT_STATE_READY).Scan(&depID) @@ -242,6 +245,7 @@ // getDeployment returns a fully populated deploymentResponse func getDeployment(depID string) (*deployment, error) { + db := getDB() rows, err := db.Query("SELECT id, type, uri, COALESCE(scope, '') as scope " + "FROM gateway_deploy_bundle WHERE deployment_id=?;", depID) if err != nil {
diff --git a/deployments.go b/deployments.go index c10a212..6d1a624 100644 --- a/deployments.go +++ b/deployments.go
@@ -12,6 +12,7 @@ "errors" "io/ioutil" "time" + "github.com/30x/apid" ) // todo: remove downloaded bundle files from old deployments @@ -180,11 +181,11 @@ // returns first bundle download error // all bundles will be attempted regardless of errors, in the future we could retry -func prepareDeployment(depID string, dep deployment) error { +func prepareDeployment(db apid.DB, depID string, dep deployment) error { log.Debugf("preparing deployment: %s", depID) - err := insertDeployment(depID, dep) + err := insertDeployment(db, depID, dep) if err != nil { log.Errorf("insert deployment failed: %v", err) return err
diff --git a/init.go b/init.go index 4468f08..07b3094 100644 --- a/init.go +++ b/init.go
@@ -11,8 +11,9 @@ ) var ( + services apid.Services log apid.LogService - db apid.DB + data apid.DataService bundlePath string ) @@ -20,14 +21,16 @@ apid.RegisterPlugin(initPlugin) } -func initPlugin(services apid.Services) error { +func initPlugin(s apid.Services) error { + services = s log = services.Log().ForModule("apiGatewayDeploy") log.Debug("start init") config := services.Config() config.SetDefault(configBundleDirKey, "bundles") - var err error + data = services.Data() + relativeBundlePath := config.GetString(configBundleDirKey) if err := os.MkdirAll(relativeBundlePath, 0700); err != nil { log.Panicf("Failed bundle directory creation: %v", err) @@ -36,15 +39,8 @@ bundlePath = path.Join(storagePath, relativeBundlePath) log.Infof("Bundle directory path is %s", bundlePath) - db, err = services.Data().DB() - if err != nil { - log.Panic("Unable to access DB", err) - } - initDB() - go distributeEvents() - initAPI(services) initListener(services) log.Debug("end init")
diff --git a/listener.go b/listener.go index 8ad5411..915a6b5 100644 --- a/listener.go +++ b/listener.go
@@ -28,12 +28,21 @@ } else if snapData, ok := e.(*common.Snapshot); ok { processSnapshot(snapData) } else { - log.Errorf("Received invalid event: %v", e) + log.Errorf("Received invalid event. Ignoring. %v", e) } } func processSnapshot(snapshot *common.Snapshot) { + log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo) + + db, err := data.DBVersion(snapshot.SnapshotInfo) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } + + initDB(db) + for _, table := range snapshot.Tables { var err error switch table.Name { @@ -44,18 +53,20 @@ } // todo: should be 0 or 1 *per system*!! - TBD row := table.Rows[len(table.Rows)-1] - err = processNewManifest(row) + err = processNewManifest(db, row) } if err != nil { log.Panicf("Error processing Snapshot: %v", err) } } + setDB(db) log.Debug("Snapshot processed") } func processChangeList(changes *common.ChangeList) { + db := getDB() for _, change := range changes.Changes { log.Debugf("change table: %s operation: %s", change.Table, change.Operation) @@ -64,7 +75,7 @@ case MANIFEST_TABLE: switch change.Operation { case common.Insert: - err = processNewManifest(change.NewRow) + err = processNewManifest(db, change.NewRow) default: log.Error("unexpected operation: %s", change.Operation) } @@ -75,7 +86,7 @@ } } -func processNewManifest(row common.Row) error { +func processNewManifest(db apid.DB, row common.Row) error { var deploymentID, manifestString string err := row.Get("id", &deploymentID) @@ -93,7 +104,7 @@ return err } - err = prepareDeployment(deploymentID, manifest) + err = prepareDeployment(db, deploymentID, manifest) if err != nil { log.Errorf("serviceDeploymentQueue prepare deployment failed: %s", deploymentID) return err
diff --git a/listener_test.go b/listener_test.go index 28d6ce3..a7cb03f 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -49,11 +49,13 @@ row["id"] = &common.ColumnVal{Value: deploymentID} row["manifest_body"] = &common.ColumnVal{Value: string(depBytes)} - var event = common.Snapshot{} - event.Tables = []common.Table{ - { - Name: MANIFEST_TABLE, - Rows: []common.Row{row}, + var event = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{ + { + Name: MANIFEST_TABLE, + Rows: []common.Row{row}, + }, }, } @@ -76,7 +78,7 @@ apid.Events().Listen(APIGEE_SYNC_EVENT, h) apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // for standard listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{}) // for test listener + apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{SnapshotInfo: "test"}) // for test listener }) It("should process ApigeeSync change event", func(done Done) {