fixed problems with DB table
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 7325666..236fe51 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -42,9 +42,10 @@ apid.InitializePlugins("") + // init full DB db, err := data.DB() Expect(err).NotTo(HaveOccurred()) - err = InitDB(db) + err = InitDBFullColumns(db) Expect(err).NotTo(HaveOccurred()) SetDB(db) @@ -116,6 +117,7 @@ _, err = getDB().Exec("DELETE FROM edgex_deployment") Expect(err).ShouldNot(HaveOccurred()) + _, err = getDB().Exec("UPDATE etag SET value=1") })
diff --git a/data.go b/data.go index 02e1f9c..72f007a 100644 --- a/data.go +++ b/data.go
@@ -5,6 +5,7 @@ "fmt" "sync" + "encoding/json" "github.com/30x/apid-core" ) @@ -38,7 +39,7 @@ Exec(query string, args ...interface{}) (sql.Result, error) } -func InitDB(db apid.DB) error { +func InitDBFullColumns(db apid.DB) error { _, err := db.Exec(` CREATE TABLE IF NOT EXISTS edgex_deployment ( id character varying(36) NOT NULL, @@ -51,7 +52,7 @@ created_by text, updated timestamp without time zone, updated_by text, - bundle_name text, + bundle_config_name text, bundle_uri text, local_bundle_uri text, bundle_checksum text, @@ -70,6 +71,49 @@ return nil } +func InitDB(db apid.DB) error { + _, err := db.Exec(` + CREATE TABLE IF NOT EXISTS edgex_deployment ( + id character varying(36) NOT NULL, + bundle_config_id varchar(36) NOT NULL, + apid_cluster_id varchar(36) NOT NULL, + data_scope_id varchar(36) NOT NULL, + bundle_config_json text NOT NULL, + config_json text NOT NULL, + created timestamp without time zone, + created_by text, + updated timestamp without time zone, + updated_by text, + bundle_config_name text, + PRIMARY KEY (id) + ); + `) + if err != nil { + return err + } + + log.Debug("Database tables created.") + return nil +} + +func AlterTable(db apid.DB) error { + _, err := db.Exec(` + ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text; + ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text; + ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text; + ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text; + ALTER TABLE edgex_deployment ADD COLUMN deploy_status string; + ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int; + ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text; + `) + if err != nil { + return err + } + + log.Debug("Database table altered.") + return nil +} + func getDB() apid.DB { dbMux.RLock() db := unsafeDB @@ -97,7 +141,7 @@ INSERT INTO edgex_deployment (id, bundle_config_id, apid_cluster_id, data_scope_id, bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_name, bundle_uri, local_bundle_uri, + updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, deploy_error_code, deploy_error_message) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18); @@ -127,6 +171,89 @@ return err } +func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error { + + log.Debugf("updating %d edgex_deployment", len(deps)) + + stmt, err := tx.Prepare(` + UPDATE edgex_deployment SET + (bundle_uri, local_bundle_uri, + bundle_checksum, bundle_checksum_type, deploy_status, + deploy_error_code, deploy_error_message) + = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8 + `) + if err != nil { + log.Errorf("prepare update edgex_deployment failed: %v", err) + return err + } + defer stmt.Close() + + for _, dep := range deps { + log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI) + + _, err = stmt.Exec( + dep.BundleURI, + dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, + dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID) + if err != nil { + log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err) + return err + } + } + + log.Debug("updateDeploymentsColumns of edgex_deployment succeeded") + return err +} + +func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) { + deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL") + if err != nil { + log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err) + return + } + var bc bundleConfigJson + for i, _ := range deployments { + log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON) + json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc) + if err != nil { + log.Errorf("JSON decoding Manifest failed: %v", err) + return + } + deployments[i].BundleName = bc.Name + deployments[i].BundleURI = bc.URI + deployments[i].BundleChecksumType = bc.ChecksumType + deployments[i].BundleChecksum = bc.Checksum + + log.Debugf("Unmarshal: %v", deployments[i].BundleURI) + } + return +} + +/* +func countColumns(db apid.DB) int { + stmt, err := db.Prepare(` + SELECT * FROM edgex_deployment LIMIT 1; + `) + if err != nil { + log.Panicf("countColumns Failed %v", err) + return + } + defer stmt.Close() + var rows *sql.Rows + rows, err = stmt.Query() + if err != nil { + log.Panicf("countColumns Failed %v", err) + return + } + defer rows.Close() + cols, err :=rows.Columns() + if err != nil { + log.Panicf("countColumns Failed %v", err) + return + } + return len(cols) +} +*/ func deleteDeployment(tx *sql.Tx, depID string) error { log.Debugf("deleteDeployment: %s", depID) @@ -166,7 +293,7 @@ stmt, err = db.Prepare(` SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_name, bundle_uri, + updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, deploy_error_code, deploy_error_message FROM edgex_deployment @@ -268,3 +395,33 @@ return nil } + +func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error { + + stmt, err := tx.Prepare(` + INSERT INTO edgex_deployment + (id, bundle_config_id, apid_cluster_id, data_scope_id, + bundle_config_json, config_json, created, created_by, + updated, updated_by, bundle_config_name) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11); + `) + if err != nil { + log.Errorf("prepare insert into edgex_deployment failed: %v", err) + return err + } + defer stmt.Close() + + log.Debugf("InsertTestDeployment: %s", dep.ID) + + _, err = stmt.Exec( + dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, + dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, + dep.Updated, dep.UpdatedBy, dep.BundleName) + if err != nil { + log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err) + return err + } + + log.Debug("InsertTestDeployment edgex_deployment succeeded") + return err +}
diff --git a/listener.go b/listener.go index 393dc03..b1e965a 100644 --- a/listener.go +++ b/listener.go
@@ -9,6 +9,7 @@ "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" + "strings" ) const ( @@ -54,10 +55,38 @@ log.Panicf("Unable to access database: %v", err) } + // alter table + err = AlterTable(db) + if err != nil { + log.Error(err.Error()) + if !strings.Contains(err.Error(), "duplicate") { + log.Panicf("Alter table failed: %v", err) + } + } // ensure that no new database updates are made on old database dbMux.Lock() SetDB(db) dbMux.Unlock() + + // update deployments + deps, err := getDeploymentsToUpdate(db) + if err != nil { + log.Panicf("Unable to getDeploymentsToUpdate: %v", err) + } + tx, err := db.Begin() + if err != nil { + log.Panicf("Error starting transaction: %v", err) + } + defer tx.Rollback() + err = updateDeploymentsColumns(tx, deps) + if err != nil { + log.Panicf("updateDeploymentsColumns failed: %v", err) + } + err = tx.Commit() + if err != nil { + log.Panicf("Error committing Snapshot update: %v", err) + } + startupOnExistingDatabase() log.Debug("Snapshot processed") }
diff --git a/listener_test.go b/listener_test.go index 09290d8..c253eb3 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -8,6 +8,7 @@ "net/http" + "fmt" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -24,8 +25,9 @@ * so the threads generated by startupOnExistingDatabase() will mess up later tests */ It("should set DB to appropriate version", func(done Done) { + saveDB := getDB() deploymentID := "set_version_test" - snapshot, dep := createSnapshotDeployment(deploymentID) + snapshot, dep := createSnapshotDeployment(deploymentID, "test_version") db, err := data.DBVersion(snapshot.SnapshotInfo) Expect(err).ShouldNot(HaveOccurred()) @@ -47,14 +49,18 @@ // DB should have been set Expect(getDB() == expectedDB).Should(BeTrue()) + + SetDB(saveDB) close(done) }) It("should process unready on existing db startup event", func(done Done) { + saveDB := getDB() + deploymentID := "startup_test" - snapshot, dep := createSnapshotDeployment(deploymentID) + snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready") db, err := data.DBVersion(snapshot.SnapshotInfo) Expect(err).ShouldNot(HaveOccurred()) @@ -76,11 +82,15 @@ d := result.deployments[0] Expect(d.ID).To(Equal(deploymentID)) + + SetDB(saveDB) close(done) }) It("should send deployment statuses on existing db startup event", func(done Done) { + saveDB := getDB() + successDep := DataDeployment{ ID: "success", LocalBundleURI: "x", @@ -124,6 +134,7 @@ Message: failDep.DeployErrorMessage, })) + SetDB(saveDB) close(done) })) @@ -140,7 +151,7 @@ db, err := data.DBVersion(snapshot.SnapshotInfo) Expect(err).NotTo(HaveOccurred()) - err = InitDB(db) + err = InitDBFullColumns(db) Expect(err).NotTo(HaveOccurred()) tx, err := db.Begin() @@ -168,7 +179,13 @@ event, dep := createChangeDeployment(deploymentID) - insertDeploymentToDb(dep, getDB()) + // insert full deployment columns + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) var listener = make(chan deploymentsResult) addSubscriber <- listener @@ -198,7 +215,15 @@ // insert deployment event, dep := createChangeDeployment(deploymentID) - insertDeploymentToDb(dep, getDB()) + + // insert full deployment columns + tx, err := getDB().Begin() + Expect(err).ShouldNot(HaveOccurred()) + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) + listener := make(chan deploymentsResult) addSubscriber <- listener apid.Events().Emit(APIGEE_SYNC_EVENT, &event) @@ -267,7 +292,7 @@ tx, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) defer tx.Rollback() - err = InsertDeployment(tx, dep) + err = InsertTestDeployment(tx, dep) Expect(err).ShouldNot(HaveOccurred()) err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) @@ -283,7 +308,7 @@ Expect(err).ShouldNot(HaveOccurred()) } -func createSnapshotDeployment(deploymentID string) (common.Snapshot, DataDeployment) { +func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) { uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) @@ -296,17 +321,22 @@ } bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + jsonBytes, err := json.Marshal(bundle) + Expect(err).ShouldNot(HaveOccurred()) + fmt.Println("JSON :" + string(jsonBytes)) + dep := DataDeployment{ ID: deploymentID, DataScopeID: deploymentID, BundleURI: bundle.URI, BundleChecksum: bundle.Checksum, BundleChecksumType: bundle.ChecksumType, + BundleConfigJSON: string(jsonBytes), } // init without info == startup on existing DB var snapshot = common.Snapshot{ - SnapshotInfo: "test", + SnapshotInfo: snapInfo, Tables: []common.Table{}, } return snapshot, dep