Merge pull request #23 from 30x/XAPID-941
Xapid 941
diff --git a/.gitignore b/.gitignore
index 9d01395..0df4220 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,5 @@
cmd/apidGatewayDeploy/apidGatewayDeploy
*.lock
/apidGatewayDeploy.iml
+/data.go
+.idea
\ No newline at end of file
diff --git a/api.go b/api.go
index 972b8eb..8a628e7 100644
--- a/api.go
+++ b/api.go
@@ -32,8 +32,9 @@
)
const (
- sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
- iso8601 = "2006-01-02T15:04:05.999Z07:00"
+ sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
+ iso8601 = "2006-01-02T15:04:05.999Z07:00"
+ sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00"
)
type deploymentsResult struct {
@@ -392,7 +393,7 @@
if t == "" {
return ""
}
- formats := []string{sqlTimeFormat, iso8601, time.RFC3339}
+ formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339}
for _, f := range formats {
timestamp, err := time.Parse(f, t)
if err == nil {
diff --git a/api_test.go b/api_test.go
index c6acd5e..a49b9a5 100644
--- a/api_test.go
+++ b/api_test.go
@@ -295,7 +295,7 @@
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
var deployStatus string
- err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID).
+ err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID).
Scan(&deployStatus)
Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS))
})
@@ -332,7 +332,7 @@
var deploy_error_code int
err = db.QueryRow(`
SELECT deploy_status, deploy_error_code, deploy_error_message
- FROM deployments
+ FROM edgex_deployment
WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message)
Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL))
Expect(deploy_error_code).Should(Equal(100))
@@ -364,8 +364,8 @@
})
It("should get iso8601 time", func() {
- testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462 -0700 MST", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
- isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
+ testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"}
+ isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"}
for i, t := range testTimes {
log.Debug("insert deployment with timestamp: " + t)
deploymentID := "api_time_iso8601_" + strconv.Itoa(i)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 3b55d05..236fe51 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -42,9 +42,10 @@
apid.InitializePlugins("")
+ // init full DB
db, err := data.DB()
Expect(err).NotTo(HaveOccurred())
- err = InitDB(db)
+ err = InitDBFullColumns(db)
Expect(err).NotTo(HaveOccurred())
SetDB(db)
@@ -114,8 +115,9 @@
apiServerBaseURI, err = url.Parse(testServer.URL)
Expect(err).NotTo(HaveOccurred())
- _, err = getDB().Exec("DELETE FROM deployments")
+ _, err = getDB().Exec("DELETE FROM edgex_deployment")
Expect(err).ShouldNot(HaveOccurred())
+
_, err = getDB().Exec("UPDATE etag SET value=1")
})
diff --git a/data.go b/data.go
index ad67728..c2b3ecf 100644
--- a/data.go
+++ b/data.go
@@ -5,7 +5,9 @@
"fmt"
"sync"
+ "encoding/json"
"github.com/30x/apid-core"
+ "strings"
)
var (
@@ -38,9 +40,9 @@
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func InitDB(db apid.DB) error {
+func InitDBFullColumns(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS deployments (
+ CREATE TABLE IF NOT EXISTS edgex_deployment (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
apid_cluster_id varchar(36) NOT NULL,
@@ -51,7 +53,7 @@
created_by text,
updated timestamp without time zone,
updated_by text,
- bundle_name text,
+ bundle_config_name text,
bundle_uri text,
local_bundle_uri text,
bundle_checksum text,
@@ -70,6 +72,56 @@
return nil
}
+func InitDB(db apid.DB) error {
+ _, err := db.Exec(`
+ CREATE TABLE IF NOT EXISTS edgex_deployment (
+ id character varying(36) NOT NULL,
+ bundle_config_id varchar(36) NOT NULL,
+ apid_cluster_id varchar(36) NOT NULL,
+ data_scope_id varchar(36) NOT NULL,
+ bundle_config_json text NOT NULL,
+ config_json text NOT NULL,
+ created timestamp without time zone,
+ created_by text,
+ updated timestamp without time zone,
+ updated_by text,
+ bundle_config_name text,
+ PRIMARY KEY (id)
+ );
+ `)
+ if err != nil {
+ return err
+ }
+
+ log.Debug("Database tables created.")
+ return nil
+}
+
+func alterTable(db apid.DB) error {
+ queries := []string{
+ "ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;",
+ "ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;",
+ "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;",
+ "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;",
+ "ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;",
+ "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;",
+ "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;",
+ }
+
+ for _, query := range queries {
+ _, err := db.Exec(query)
+ if err != nil {
+ if strings.Contains(err.Error(), "duplicate column name") {
+ log.Warnf("AlterTable warning: %s", err)
+ } else {
+ return err
+ }
+ }
+ }
+ log.Debug("Database table altered.")
+ return nil
+}
+
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
@@ -91,19 +143,19 @@
func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
- log.Debugf("inserting %d deployments", len(deps))
+ log.Debugf("inserting %d edgex_deployment", len(deps))
stmt, err := tx.Prepare(`
- INSERT INTO deployments
+ INSERT INTO edgex_deployment
(id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
+ updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri,
bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
- log.Errorf("prepare insert into deployments failed: %v", err)
+ log.Errorf("prepare insert into edgex_deployment failed: %v", err)
return err
}
defer stmt.Close()
@@ -118,29 +170,87 @@
dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
dep.DeployErrorCode, dep.DeployErrorMessage)
if err != nil {
- log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
+ log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
return err
}
}
- log.Debug("inserting deployments succeeded")
+ log.Debug("inserting edgex_deployment succeeded")
return err
}
+func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error {
+
+ log.Debugf("updating %d edgex_deployment", len(deps))
+
+ stmt, err := tx.Prepare(`
+ UPDATE edgex_deployment SET
+ (bundle_uri, local_bundle_uri,
+ bundle_checksum, bundle_checksum_type, deploy_status,
+ deploy_error_code, deploy_error_message)
+ = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8
+ `)
+ if err != nil {
+ log.Errorf("prepare update edgex_deployment failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
+
+ for _, dep := range deps {
+ log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI)
+
+ _, err = stmt.Exec(
+ dep.BundleURI,
+ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
+ dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID)
+ if err != nil {
+ log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err)
+ return err
+ }
+ }
+
+ log.Debug("updateDeploymentsColumns of edgex_deployment succeeded")
+ return err
+}
+
+func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) {
+ deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL")
+ if err != nil {
+ log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err)
+ return
+ }
+ var bc bundleConfigJson
+ for i, _ := range deployments {
+ log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON)
+ json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc)
+ if err != nil {
+ log.Errorf("JSON decoding Manifest failed: %v", err)
+ return
+ }
+ deployments[i].BundleName = bc.Name
+ deployments[i].BundleURI = bc.URI
+ deployments[i].BundleChecksumType = bc.ChecksumType
+ deployments[i].BundleChecksum = bc.Checksum
+
+ log.Debugf("Unmarshal: %v", deployments[i].BundleURI)
+ }
+ return
+}
+
func deleteDeployment(tx *sql.Tx, depID string) error {
log.Debugf("deleteDeployment: %s", depID)
- stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
+ stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;")
if err != nil {
- log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
+ log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID)
if err != nil {
- log.Errorf("delete from deployments %s failed: %v", depID, err)
+ log.Errorf("delete from edgex_deployment %s failed: %v", depID, err)
return err
}
@@ -166,10 +276,10 @@
stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_name, bundle_uri,
+ updated, updated_by, bundle_config_name, bundle_uri,
local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message
- FROM deployments
+ FROM edgex_deployment
` + where)
if err != nil {
return
@@ -181,7 +291,7 @@
if err == sql.ErrNoRows {
return
}
- log.Errorf("Error querying deployments: %v", err)
+ log.Errorf("Error querying edgex_deployment: %v", err)
return
}
defer rows.Close()
@@ -220,7 +330,7 @@
defer tx.Rollback()
stmt, err := tx.Prepare(`
- UPDATE deployments
+ UPDATE edgex_deployment
SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
WHERE id=$4;
`)
@@ -233,7 +343,7 @@
for _, result := range results {
res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
if err != nil {
- log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
+ log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err)
return err
}
n, err := res.RowsAffected()
@@ -251,7 +361,7 @@
func updateLocalBundleURI(depID, localBundleUri string) error {
- stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
+ stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
log.Errorf("prepare updateLocalBundleURI failed: %v", err)
return err
@@ -260,11 +370,41 @@
_, err = stmt.Exec(localBundleUri, depID)
if err != nil {
- log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
+ log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
return err
}
- log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
+ log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}
+
+func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error {
+
+ stmt, err := tx.Prepare(`
+ INSERT INTO edgex_deployment
+ (id, bundle_config_id, apid_cluster_id, data_scope_id,
+ bundle_config_json, config_json, created, created_by,
+ updated, updated_by, bundle_config_name)
+ VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);
+ `)
+ if err != nil {
+ log.Errorf("prepare insert into edgex_deployment failed: %v", err)
+ return err
+ }
+ defer stmt.Close()
+
+ log.Debugf("InsertTestDeployment: %s", dep.ID)
+
+ _, err = stmt.Exec(
+ dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
+ dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
+ dep.Updated, dep.UpdatedBy, dep.BundleName)
+ if err != nil {
+ log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
+ return err
+ }
+
+ log.Debug("InsertTestDeployment edgex_deployment succeeded")
+ return err
+}
diff --git a/listener.go b/listener.go
index 8230176..8a4f017 100644
--- a/listener.go
+++ b/listener.go
@@ -54,69 +54,36 @@
log.Panicf("Unable to access database: %v", err)
}
- err = InitDB(db)
+ // alter table
+ err = alterTable(db)
if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
+ log.Panicf("Alter table failed: %v", err)
}
-
- var deploymentsToInsert []DataDeployment
- var errResults apiDeploymentResults
- for _, table := range snapshot.Tables {
- switch table.Name {
- case DEPLOYMENT_TABLE:
- for _, row := range table.Rows {
- dep, err := dataDeploymentFromRow(row)
- if err == nil {
- deploymentsToInsert = append(deploymentsToInsert, dep)
- } else {
- result := apiDeploymentResult{
- ID: dep.ID,
- Status: RESPONSE_STATUS_FAIL,
- ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON,
- Message: fmt.Sprintf("unable to parse deployment: %v", err),
- }
- errResults = append(errResults, result)
- }
- }
- }
- }
-
// ensure that no new database updates are made on old database
dbMux.Lock()
- defer dbMux.Unlock()
+ SetDB(db)
+ dbMux.Unlock()
+ // update deployments
+ deps, err := getDeploymentsToUpdate(db)
+ if err != nil {
+ log.Panicf("Unable to getDeploymentsToUpdate: %v", err)
+ }
tx, err := db.Begin()
if err != nil {
log.Panicf("Error starting transaction: %v", err)
}
defer tx.Rollback()
-
- err = insertDeployments(tx, deploymentsToInsert)
+ err = updateDeploymentsColumns(tx, deps)
if err != nil {
- log.Panicf("Error processing Snapshot: %v", err)
+ log.Panicf("updateDeploymentsColumns failed: %v", err)
}
-
err = tx.Commit()
if err != nil {
- log.Panicf("Error committing Snapshot change: %v", err)
+ log.Panicf("Error committing Snapshot update: %v", err)
}
- SetDB(db)
-
- for _, dep := range deploymentsToInsert {
- queueDownloadRequest(dep)
- }
-
- // transmit parsing errors back immediately
- if len(errResults) > 0 {
- go transmitDeploymentResultsToServer(errResults)
- }
-
- // if no tables, this a startup event for an existing DB
- if len(snapshot.Tables) == 0 {
- startupOnExistingDatabase()
- }
-
+ startupOnExistingDatabase()
log.Debug("Snapshot processed")
}
@@ -159,8 +126,8 @@
func processChangeList(changes *common.ChangeList) {
- // gather deleted bundle info
- var deploymentsToInsert, deploymentsToDelete []DataDeployment
+ // changes have been applied to DB
+ var insertedDeployments, deletedDeployments []DataDeployment
var errResults apiDeploymentResults
for _, change := range changes.Changes {
switch change.Table {
@@ -169,7 +136,7 @@
case common.Insert:
dep, err := dataDeploymentFromRow(change.NewRow)
if err == nil {
- deploymentsToInsert = append(deploymentsToInsert, dep)
+ insertedDeployments = append(insertedDeployments, dep)
} else {
result := apiDeploymentResult{
ID: dep.ID,
@@ -188,7 +155,7 @@
ID: id,
DataScopeID: dataScopeID,
}
- deploymentsToDelete = append(deploymentsToDelete, dep)
+ deletedDeployments = append(deletedDeployments, dep)
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
@@ -200,45 +167,23 @@
go transmitDeploymentResultsToServer(errResults)
}
- tx, err := getDB().Begin()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- }
- defer tx.Rollback()
-
- for _, dep := range deploymentsToDelete {
- err = deleteDeployment(tx, dep.ID)
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- }
- }
- err = insertDeployments(tx, deploymentsToInsert)
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- }
-
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error processing ChangeList: %v", err)
- }
-
- for _, d := range deploymentsToDelete {
+ for _, d := range deletedDeployments {
deploymentsChanged <- d.ID
}
log.Debug("ChangeList processed")
- for _, dep := range deploymentsToInsert {
+ for _, dep := range insertedDeployments {
queueDownloadRequest(dep)
}
// clean up old bundles
- if len(deploymentsToDelete) > 0 {
- log.Debugf("will delete %d old bundles", len(deploymentsToDelete))
+ if len(deletedDeployments) > 0 {
+ log.Debugf("will delete %d old bundles", len(deletedDeployments))
go func() {
// give clients a minute to avoid conflicts
time.Sleep(bundleCleanupDelay)
- for _, dep := range deploymentsToDelete {
+ for _, dep := range deletedDeployments {
bundleFile := getBundleFile(dep)
log.Debugf("removing old bundle: %v", bundleFile)
safeDelete(bundleFile)
diff --git a/listener_test.go b/listener_test.go
index 0c79f3f..c253eb3 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -8,6 +8,7 @@
"net/http"
+ "fmt"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -18,116 +19,56 @@
Context("ApigeeSync snapshot event", func() {
- It("should set DB and process", func(done Done) {
+ /*
+ * Note that the test snapshot should not be empty.
+ * If it's empty, you can't use deploymentsResult chan to mark the end of processing,
+ * so the threads generated by startupOnExistingDatabase() will mess up later tests
+ */
+ It("should set DB to appropriate version", func(done Done) {
+ saveDB := getDB()
+ deploymentID := "set_version_test"
+ snapshot, dep := createSnapshotDeployment(deploymentID, "test_version")
- deploymentID := "listener_test_1"
-
- uri, err := url.Parse(testServer.URL)
+ db, err := data.DBVersion(snapshot.SnapshotInfo)
Expect(err).ShouldNot(HaveOccurred())
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle1 := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle1)
+ err = InitDB(db)
Expect(err).ShouldNot(HaveOccurred())
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- var event = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{
- {
- Name: DEPLOYMENT_TABLE,
- Rows: []common.Row{row},
- },
- },
- }
+ insertDeploymentToDb(dep, db)
+ expectedDB, err := data.DBVersion(snapshot.SnapshotInfo)
+ Expect(err).NotTo(HaveOccurred())
var listener = make(chan deploymentsResult)
addSubscriber <- listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
result := <-listener
- Expect(result.err).ToNot(HaveOccurred())
+ Expect(result.err).ShouldNot(HaveOccurred())
- // from event
- Expect(len(result.deployments)).To(Equal(1))
- d := result.deployments[0]
+ // DB should have been set
+ Expect(getDB() == expectedDB).Should(BeTrue())
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle1.Name))
- Expect(d.BundleURI).To(Equal(bundle1.URI))
-
- // from db
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(1))
- d = deployments[0]
-
- Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle1.Name))
- Expect(d.BundleURI).To(Equal(bundle1.URI))
-
+ SetDB(saveDB)
close(done)
})
It("should process unready on existing db startup event", func(done Done) {
+ saveDB := getDB()
+
deploymentID := "startup_test"
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-
- dep := DataDeployment{
- ID: deploymentID,
- DataScopeID: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- }
-
- // init without info == startup on existing DB
- var snapshot = common.Snapshot{
- SnapshotInfo: "test",
- Tables: []common.Table{},
- }
+ snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready")
db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+ Expect(err).ShouldNot(HaveOccurred())
err = InitDB(db)
- if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
- }
-
- tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
-
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
+ insertDeploymentToDb(dep, db)
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -141,11 +82,15 @@
d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
+
+ SetDB(saveDB)
close(done)
})
It("should send deployment statuses on existing db startup event", func(done Done) {
+ saveDB := getDB()
+
successDep := DataDeployment{
ID: "success",
LocalBundleURI: "x",
@@ -189,6 +134,7 @@
Message: failDep.DeployErrorMessage,
}))
+ SetDB(saveDB)
close(done)
}))
@@ -203,14 +149,10 @@
}
db, err := data.DBVersion(snapshot.SnapshotInfo)
- if err != nil {
- log.Panicf("Unable to access database: %v", err)
- }
+ Expect(err).NotTo(HaveOccurred())
- err = InitDB(db)
- if err != nil {
- log.Panicf("Unable to initialize database: %v", err)
- }
+ err = InitDBFullColumns(db)
+ Expect(err).NotTo(HaveOccurred())
tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
@@ -231,37 +173,19 @@
Context("ApigeeSync change event", func() {
- It("add event should add a deployment", func(done Done) {
+ It("inserting event should deliver the deployment to subscribers", func(done Done) {
deploymentID := "add_test_1"
- uri, err := url.Parse(testServer.URL)
+ event, dep := createChangeDeployment(deploymentID)
+
+ // insert full deployment columns
+ tx, err := getDB().Begin()
Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle)
+ err = InsertDeployment(tx, dep)
Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- var event = common.ChangeList{
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: DEPLOYMENT_TABLE,
- NewRow: row,
- },
- },
- }
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -279,31 +203,39 @@
d := deployments[0]
Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(bundle.Name))
- Expect(d.BundleURI).To(Equal(bundle.URI))
+ Expect(d.BundleName).To(Equal(dep.BundleName))
+ Expect(d.BundleURI).To(Equal(dep.BundleURI))
close(done)
})
- It("delete event should delete a deployment", func(done Done) {
+ It("delete event should deliver to subscribers", func(done Done) {
deploymentID := "delete_test_1"
+ // insert deployment
+ event, dep := createChangeDeployment(deploymentID)
+
+ // insert full deployment columns
tx, err := getDB().Begin()
Expect(err).ShouldNot(HaveOccurred())
- dep := DataDeployment{
- ID: deploymentID,
- LocalBundleURI: "whatever",
- }
err = InsertDeployment(tx, dep)
Expect(err).ShouldNot(HaveOccurred())
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
+ listener := make(chan deploymentsResult)
+ addSubscriber <- listener
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
+ // wait for event to propagate
+ result := <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
+
+ // delete deployment
+ deletDeploymentFromDb(dep, getDB())
row := common.Row{}
row["id"] = &common.ColumnVal{Value: deploymentID}
-
- var event = common.ChangeList{
+ event = common.ChangeList{
Changes: []common.Change{
{
Operation: common.Delete,
@@ -313,19 +245,99 @@
},
}
- var listener = make(chan deploymentsResult)
+ listener = make(chan deploymentsResult)
addSubscriber <- listener
-
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
-
- <-listener
-
- deployments, err := getReadyDeployments()
- Expect(err).ShouldNot(HaveOccurred())
-
- Expect(len(deployments)).To(Equal(0))
-
+ result = <-listener
+ Expect(result.err).ShouldNot(HaveOccurred())
+ Expect(len(result.deployments)).To(Equal(0))
close(done)
})
})
})
+
+func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) {
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ changeList := common.ChangeList{
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: DEPLOYMENT_TABLE,
+ NewRow: row,
+ },
+ },
+ }
+ dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow)
+ return changeList, dep
+}
+
+func insertDeploymentToDb(dep DataDeployment, db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ defer tx.Rollback()
+ err = InsertTestDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func deletDeploymentFromDb(dep DataDeployment, db apid.DB) {
+ tx, err := db.Begin()
+ Expect(err).ShouldNot(HaveOccurred())
+ defer tx.Rollback()
+ err = deleteDeployment(tx, dep.ID)
+ Expect(err).ShouldNot(HaveOccurred())
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
+}
+
+func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) {
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+
+ jsonBytes, err := json.Marshal(bundle)
+ Expect(err).ShouldNot(HaveOccurred())
+ fmt.Println("JSON :" + string(jsonBytes))
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ BundleConfigJSON: string(jsonBytes),
+ }
+
+ // init without info == startup on existing DB
+ var snapshot = common.Snapshot{
+ SnapshotInfo: snapInfo,
+ Tables: []common.Table{},
+ }
+ return snapshot, dep
+}