fixed problems with DB table
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 7325666..236fe51 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -42,9 +42,10 @@
apid.InitializePlugins("")
+ // init full DB
db, err := data.DB()
Expect(err).NotTo(HaveOccurred())
- err = InitDB(db)
+ err = InitDBFullColumns(db)
Expect(err).NotTo(HaveOccurred())
SetDB(db)
@@ -116,6 +117,7 @@
_, err = getDB().Exec("DELETE FROM edgex_deployment")
Expect(err).ShouldNot(HaveOccurred())
+
_, err = getDB().Exec("UPDATE etag SET value=1")
})
diff --git a/data.go b/data.go
index 02e1f9c..72f007a 100644
--- a/data.go
+++ b/data.go
@@ -5,6 +5,7 @@
"fmt"
"sync"
+ "encoding/json"
"github.com/30x/apid-core"
)
@@ -38,7 +39,7 @@
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func InitDB(db apid.DB) error {
+func InitDBFullColumns(db apid.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS edgex_deployment (
id character varying(36) NOT NULL,
@@ -51,7 +52,7 @@
created_by text,
updated timestamp without time zone,
updated_by text,
- bundle_name text,
+ bundle_config_name text,
bundle_uri text,
local_bundle_uri text,
bundle_checksum text,
@@ -70,6 +71,49 @@
return nil
}
+func InitDB(db apid.DB) error {
+ _, err := db.Exec(`
+ CREATE TABLE IF NOT EXISTS edgex_deployment (
+ id character varying(36) NOT NULL,
+ bundle_config_id varchar(36) NOT NULL,
+ apid_cluster_id varchar(36) NOT NULL,
+ data_scope_id varchar(36) NOT NULL,
+ bundle_config_json text NOT NULL,
+ config_json text NOT NULL,
+ created timestamp without time zone,
+ created_by text,
+ updated timestamp without time zone,
+ updated_by text,
+ bundle_config_name text,
+ PRIMARY KEY (id)
+ );
+ `)
+ if err != nil {
+ return err
+ }
+
+ log.Debug("Database tables created.")
+ return nil
+}
+
+func AlterTable(db apid.DB) error {
+ _, err := db.Exec(`
+ ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;
+ ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;
+ ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;
+ ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;
+ ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;
+ ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;
+ ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;
+ `)
+ if err != nil {
+ return err
+ }
+
+ log.Debug("Database table altered.")
+ return nil
+}
+
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
@@ -97,7 +141,7 @@
INSERT INTO edgex_deployment
(id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+ updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri,
bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
@@ -127,6 +171,89 @@
return err
}
+func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error {
+
+ log.Debugf("updating %d edgex_deployment", len(deps))
+
+ stmt, err := tx.Prepare(`
+ UPDATE edgex_deployment SET
+ (bundle_uri, local_bundle_uri,
+ bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message)
+ = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8
+ `)
+ if err != nil {
+ log.Errorf("prepare update edgex_deployment failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
+
+ for _, dep := range deps {
+ log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI)
+
+ _, err = stmt.Exec(
+ dep.BundleURI,
+ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+ dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID)
+ if err != nil {
+ log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err)
+ return err
+ }
+ }
+
+ log.Debug("updateDeploymentsColumns of edgex_deployment succeeded")
+ return err
+}
+
+func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) {
+ deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL")
+ if err != nil {
+ log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err)
+ return
+ }
+ var bc bundleConfigJson
+ for i, _ := range deployments {
+ log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON)
+ json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc)
+ if err != nil {
+ log.Errorf("JSON decoding Manifest failed: %v", err)
+ return
+ }
+ deployments[i].BundleName = bc.Name
+ deployments[i].BundleURI = bc.URI
+ deployments[i].BundleChecksumType = bc.ChecksumType
+ deployments[i].BundleChecksum = bc.Checksum
+
+ log.Debugf("Unmarshal: %v", deployments[i].BundleURI)
+ }
+ return
+}
+
+/*
+func countColumns(db apid.DB) int {
+ stmt, err := db.Prepare(`
+ SELECT * FROM edgex_deployment LIMIT 1;
+ `)
+ if err != nil {
+ log.Panicf("countColumns Failed %v", err)
+ return
+ }
+ defer stmt.Close()
+ var rows *sql.Rows
+ rows, err = stmt.Query()
+ if err != nil {
+ log.Panicf("countColumns Failed %v", err)
+ return
+ }
+ defer rows.Close()
+ cols, err :=rows.Columns()
+ if err != nil {
+ log.Panicf("countColumns Failed %v", err)
+ return
+ }
+ return len(cols)
+}
+*/
func deleteDeployment(tx *sql.Tx, depID string) error {
log.Debugf("deleteDeployment: %s", depID)
@@ -166,7 +293,7 @@
stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_name, bundle_uri,
+ updated, updated_by, bundle_config_name, bundle_uri,
local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message
FROM edgex_deployment
@@ -268,3 +395,33 @@
return nil
}
+
+func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error {
+
+ stmt, err := tx.Prepare(`
+ INSERT INTO edgex_deployment
+ (id, bundle_config_id, apid_cluster_id, data_scope_id,
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_config_name)
+ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);
+ `)
+ if err != nil {
+ log.Errorf("prepare insert into edgex_deployment failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
+
+ log.Debugf("InsertTestDeployment: %s", dep.ID)
+
+ _, err = stmt.Exec(
+ dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+ dep.Updated, dep.UpdatedBy, dep.BundleName)
+ if err != nil {
+ log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
+ return err
+ }
+
+ log.Debug("InsertTestDeployment edgex_deployment succeeded")
+ return err
+}
diff --git a/listener.go b/listener.go
index 393dc03..b1e965a 100644
--- a/listener.go
+++ b/listener.go
@@ -9,6 +9,7 @@
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
+ "strings"
)
const (
@@ -54,10 +55,38 @@
log.Panicf("Unable to access database: %v", err)
}
+ // alter table
+ err = AlterTable(db)
+ if err != nil {
+ log.Error(err.Error())
+ if !strings.Contains(err.Error(), "duplicate") {
+ log.Panicf("Alter table failed: %v", err)
+ }
+ }
// ensure that no new database updates are made on old database
dbMux.Lock()
SetDB(db)
dbMux.Unlock()
+
+ // update deployments
+ deps, err := getDeploymentsToUpdate(db)
+ if err != nil {
+ log.Panicf("Unable to getDeploymentsToUpdate: %v", err)
+ }
+ tx, err := db.Begin()
+ if err != nil {
+ log.Panicf("Error starting transaction: %v", err)
+ }
+ defer tx.Rollback()
+ err = updateDeploymentsColumns(tx, deps)
+ if err != nil {
+ log.Panicf("updateDeploymentsColumns failed: %v", err)
+ }
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error committing Snapshot update: %v", err)
+ }
+
startupOnExistingDatabase()
log.Debug("Snapshot processed")
}
diff --git a/listener_test.go b/listener_test.go
index 09290d8..c253eb3 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -8,6 +8,7 @@
"net/http"
+ "fmt"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -24,8 +25,9 @@
* so the threads generated by startupOnExistingDatabase() will mess up later tests
*/
It("should set DB to appropriate version", func(done Done) {
+ saveDB := getDB()
deploymentID := "set_version_test"
- snapshot, dep := createSnapshotDeployment(deploymentID)
+ snapshot, dep := createSnapshotDeployment(deploymentID, "test_version")
db, err := data.DBVersion(snapshot.SnapshotInfo)
Expect(err).ShouldNot(HaveOccurred())
@@ -47,14 +49,18 @@
// DB should have been set
Expect(getDB() == expectedDB).Should(BeTrue())
+
+ SetDB(saveDB)
close(done)
})
It("should process unready on existing db startup event", func(done Done) {
+ saveDB := getDB()
+
deploymentID := "startup_test"
- snapshot, dep := createSnapshotDeployment(deploymentID)
+ snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready")
db, err := data.DBVersion(snapshot.SnapshotInfo)
Expect(err).ShouldNot(HaveOccurred())
@@ -76,11 +82,15 @@
d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
+
+ SetDB(saveDB)
close(done)
})
It("should send deployment statuses on existing db startup event", func(done Done) {
+ saveDB := getDB()
+
successDep := DataDeployment{
ID: "success",
LocalBundleURI: "x",
@@ -124,6 +134,7 @@
Message: failDep.DeployErrorMessage,
}))
+ SetDB(saveDB)
close(done)
}))
@@ -140,7 +151,7 @@
db, err := data.DBVersion(snapshot.SnapshotInfo)
Expect(err).NotTo(HaveOccurred())
- err = InitDB(db)
+ err = InitDBFullColumns(db)
Expect(err).NotTo(HaveOccurred())
tx, err := db.Begin()
@@ -168,7 +179,13 @@
event, dep := createChangeDeployment(deploymentID)
- insertDeploymentToDb(dep, getDB())
+ // insert full deployment columns
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -198,7 +215,15 @@
// insert deployment
event, dep := createChangeDeployment(deploymentID)
- insertDeploymentToDb(dep, getDB())
+
+ // insert full deployment columns
+ tx, err := getDB().Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+
listener := make(chan deploymentsResult)
addSubscriber <- listener
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
@@ -267,7 +292,7 @@
tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
defer tx.Rollback()
- err = InsertDeployment(tx, dep)
+ err = InsertTestDeployment(tx, dep)
Expect(err).ShouldNot(HaveOccurred())
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
@@ -283,7 +308,7 @@
Expect(err).ShouldNot(HaveOccurred())
}
-func createSnapshotDeployment(deploymentID string) (common.Snapshot, DataDeployment) {
+func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) {
uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
@@ -296,17 +321,22 @@
}
bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ jsonBytes, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+ fmt.Println("JSON :" + string(jsonBytes))
+
dep := DataDeployment{
ID: deploymentID,
DataScopeID: deploymentID,
BundleURI: bundle.URI,
BundleChecksum: bundle.Checksum,
BundleChecksumType: bundle.ChecksumType,
+ BundleConfigJSON: string(jsonBytes),
}
// init without info == startup on existing DB
var snapshot = common.Snapshot{
- SnapshotInfo: "test",
+ SnapshotInfo: snapInfo,
Tables: []common.Table{},
}
return snapshot, dep