Revert "Xapid 941"
diff --git a/.gitignore b/.gitignore
index 0df4220..9d01395 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,5 +5,3 @@
cmd/apidGatewayDeploy/apidGatewayDeploy
*.lock
/apidGatewayDeploy.iml
-/data.go
-.idea
\ No newline at end of file
diff --git a/api.go b/api.go
index 8a628e7..972b8eb 100644
--- a/api.go
+++ b/api.go
@@ -32,9 +32,8 @@
)
const (
- sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
- iso8601 = "2006-01-02T15:04:05.999Z07:00"
- sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00"
+ sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST"
+ iso8601 = "2006-01-02T15:04:05.999Z07:00"
)
type deploymentsResult struct {
@@ -393,7 +392,7 @@
if t == "" {
return ""
}
- formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339}
+ formats := []string{sqlTimeFormat, iso8601, time.RFC3339}
for _, f := range formats {
timestamp, err := time.Parse(f, t)
if err == nil {
diff --git a/api_test.go b/api_test.go
index a49b9a5..c6acd5e 100644
--- a/api_test.go
+++ b/api_test.go
@@ -295,7 +295,7 @@
Expect(resp.StatusCode).Should(Equal(http.StatusOK))
var deployStatus string
- err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID).
+ err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID).
Scan(&deployStatus)
Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS))
})
@@ -332,7 +332,7 @@
var deploy_error_code int
err = db.QueryRow(`
SELECT deploy_status, deploy_error_code, deploy_error_message
- FROM edgex_deployment
+ FROM deployments
WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message)
Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL))
Expect(deploy_error_code).Should(Equal(100))
@@ -364,8 +364,8 @@
})
It("should get iso8601 time", func() {
- testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"}
- isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"}
+ testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462 -0700 MST", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
+ isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"}
for i, t := range testTimes {
log.Debug("insert deployment with timestamp: " + t)
deploymentID := "api_time_iso8601_" + strconv.Itoa(i)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go
index 236fe51..3b55d05 100644
--- a/apidGatewayDeploy_suite_test.go
+++ b/apidGatewayDeploy_suite_test.go
@@ -42,10 +42,9 @@
apid.InitializePlugins("")
- // init full DB
db, err := data.DB()
Expect(err).NotTo(HaveOccurred())
- err = InitDBFullColumns(db)
+ err = InitDB(db)
Expect(err).NotTo(HaveOccurred())
SetDB(db)
@@ -115,9 +114,8 @@
apiServerBaseURI, err = url.Parse(testServer.URL)
Expect(err).NotTo(HaveOccurred())
- _, err = getDB().Exec("DELETE FROM edgex_deployment")
+ _, err = getDB().Exec("DELETE FROM deployments")
Expect(err).ShouldNot(HaveOccurred())
-
_, err = getDB().Exec("UPDATE etag SET value=1")
})
diff --git a/data.go b/data.go
index c2b3ecf..ad67728 100644
--- a/data.go
+++ b/data.go
@@ -5,9 +5,7 @@
"fmt"
"sync"
- "encoding/json"
"github.com/30x/apid-core"
- "strings"
)
var (
@@ -40,9 +38,9 @@
Exec(query string, args ...interface{}) (sql.Result, error)
}
-func InitDBFullColumns(db apid.DB) error {
+func InitDB(db apid.DB) error {
_, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS edgex_deployment (
+ CREATE TABLE IF NOT EXISTS deployments (
id character varying(36) NOT NULL,
bundle_config_id varchar(36) NOT NULL,
apid_cluster_id varchar(36) NOT NULL,
@@ -53,7 +51,7 @@
created_by text,
updated timestamp without time zone,
updated_by text,
- bundle_config_name text,
+ bundle_name text,
bundle_uri text,
local_bundle_uri text,
bundle_checksum text,
@@ -72,56 +70,6 @@
return nil
}
-func InitDB(db apid.DB) error {
- _, err := db.Exec(`
- CREATE TABLE IF NOT EXISTS edgex_deployment (
- id character varying(36) NOT NULL,
- bundle_config_id varchar(36) NOT NULL,
- apid_cluster_id varchar(36) NOT NULL,
- data_scope_id varchar(36) NOT NULL,
- bundle_config_json text NOT NULL,
- config_json text NOT NULL,
- created timestamp without time zone,
- created_by text,
- updated timestamp without time zone,
- updated_by text,
- bundle_config_name text,
- PRIMARY KEY (id)
- );
- `)
- if err != nil {
- return err
- }
-
- log.Debug("Database tables created.")
- return nil
-}
-
-func alterTable(db apid.DB) error {
- queries := []string{
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;",
- "ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;",
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;",
- "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;",
- "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;",
- }
-
- for _, query := range queries {
- _, err := db.Exec(query)
- if err != nil {
- if strings.Contains(err.Error(), "duplicate column name") {
- log.Warnf("AlterTable warning: %s", err)
- } else {
- return err
- }
- }
- }
- log.Debug("Database table altered.")
- return nil
-}
-
func getDB() apid.DB {
dbMux.RLock()
db := unsafeDB
@@ -143,19 +91,19 @@
func insertDeployments(tx *sql.Tx, deps []DataDeployment) error {
- log.Debugf("inserting %d edgex_deployment", len(deps))
+ log.Debugf("inserting %d deployments", len(deps))
stmt, err := tx.Prepare(`
- INSERT INTO edgex_deployment
+ INSERT INTO deployments
(id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri,
+ updated, updated_by, bundle_name, bundle_uri, local_bundle_uri,
bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18);
`)
if err != nil {
- log.Errorf("prepare insert into edgex_deployment failed: %v", err)
+ log.Errorf("prepare insert into deployments failed: %v", err)
return err
}
defer stmt.Close()
@@ -170,87 +118,29 @@
dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
dep.DeployErrorCode, dep.DeployErrorMessage)
if err != nil {
- log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
+ log.Errorf("insert into deployments %s failed: %v", dep.ID, err)
return err
}
}
- log.Debug("inserting edgex_deployment succeeded")
+ log.Debug("inserting deployments succeeded")
return err
}
-func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error {
-
- log.Debugf("updating %d edgex_deployment", len(deps))
-
- stmt, err := tx.Prepare(`
- UPDATE edgex_deployment SET
- (bundle_uri, local_bundle_uri,
- bundle_checksum, bundle_checksum_type, deploy_status,
- deploy_error_code, deploy_error_message)
- = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8
- `)
- if err != nil {
- log.Errorf("prepare update edgex_deployment failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- for _, dep := range deps {
- log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI)
-
- _, err = stmt.Exec(
- dep.BundleURI,
- dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus,
- dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID)
- if err != nil {
- log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err)
- return err
- }
- }
-
- log.Debug("updateDeploymentsColumns of edgex_deployment succeeded")
- return err
-}
-
-func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) {
- deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL")
- if err != nil {
- log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err)
- return
- }
- var bc bundleConfigJson
- for i, _ := range deployments {
- log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON)
- json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc)
- if err != nil {
- log.Errorf("JSON decoding Manifest failed: %v", err)
- return
- }
- deployments[i].BundleName = bc.Name
- deployments[i].BundleURI = bc.URI
- deployments[i].BundleChecksumType = bc.ChecksumType
- deployments[i].BundleChecksum = bc.Checksum
-
- log.Debugf("Unmarshal: %v", deployments[i].BundleURI)
- }
- return
-}
-
func deleteDeployment(tx *sql.Tx, depID string) error {
log.Debugf("deleteDeployment: %s", depID)
- stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;")
+ stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;")
if err != nil {
- log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err)
+ log.Errorf("prepare delete from deployments %s failed: %v", depID, err)
return err
}
defer stmt.Close()
_, err = stmt.Exec(depID)
if err != nil {
- log.Errorf("delete from edgex_deployment %s failed: %v", depID, err)
+ log.Errorf("delete from deployments %s failed: %v", depID, err)
return err
}
@@ -276,10 +166,10 @@
stmt, err = db.Prepare(`
SELECT id, bundle_config_id, apid_cluster_id, data_scope_id,
bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name, bundle_uri,
+ updated, updated_by, bundle_name, bundle_uri,
local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status,
deploy_error_code, deploy_error_message
- FROM edgex_deployment
+ FROM deployments
` + where)
if err != nil {
return
@@ -291,7 +181,7 @@
if err == sql.ErrNoRows {
return
}
- log.Errorf("Error querying edgex_deployment: %v", err)
+ log.Errorf("Error querying deployments: %v", err)
return
}
defer rows.Close()
@@ -330,7 +220,7 @@
defer tx.Rollback()
stmt, err := tx.Prepare(`
- UPDATE edgex_deployment
+ UPDATE deployments
SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3
WHERE id=$4;
`)
@@ -343,7 +233,7 @@
for _, result := range results {
res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID)
if err != nil {
- log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err)
+ log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err)
return err
}
n, err := res.RowsAffected()
@@ -361,7 +251,7 @@
func updateLocalBundleURI(depID, localBundleUri string) error {
- stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;")
+ stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;")
if err != nil {
log.Errorf("prepare updateLocalBundleURI failed: %v", err)
return err
@@ -370,41 +260,11 @@
_, err = stmt.Exec(localBundleUri, depID)
if err != nil {
- log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
+ log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err)
return err
}
- log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri)
+ log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri)
return nil
}
-
-func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error {
-
- stmt, err := tx.Prepare(`
- INSERT INTO edgex_deployment
- (id, bundle_config_id, apid_cluster_id, data_scope_id,
- bundle_config_json, config_json, created, created_by,
- updated, updated_by, bundle_config_name)
- VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);
- `)
- if err != nil {
- log.Errorf("prepare insert into edgex_deployment failed: %v", err)
- return err
- }
- defer stmt.Close()
-
- log.Debugf("InsertTestDeployment: %s", dep.ID)
-
- _, err = stmt.Exec(
- dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID,
- dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy,
- dep.Updated, dep.UpdatedBy, dep.BundleName)
- if err != nil {
- log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err)
- return err
- }
-
- log.Debug("InsertTestDeployment edgex_deployment succeeded")
- return err
-}
diff --git a/listener.go b/listener.go
index 8a4f017..8230176 100644
--- a/listener.go
+++ b/listener.go
@@ -54,36 +54,69 @@
log.Panicf("Unable to access database: %v", err)
}
- // alter table
- err = alterTable(db)
+ err = InitDB(db)
if err != nil {
- log.Panicf("Alter table failed: %v", err)
+ log.Panicf("Unable to initialize database: %v", err)
}
+
+ var deploymentsToInsert []DataDeployment
+ var errResults apiDeploymentResults
+ for _, table := range snapshot.Tables {
+ switch table.Name {
+ case DEPLOYMENT_TABLE:
+ for _, row := range table.Rows {
+ dep, err := dataDeploymentFromRow(row)
+ if err == nil {
+ deploymentsToInsert = append(deploymentsToInsert, dep)
+ } else {
+ result := apiDeploymentResult{
+ ID: dep.ID,
+ Status: RESPONSE_STATUS_FAIL,
+ ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON,
+ Message: fmt.Sprintf("unable to parse deployment: %v", err),
+ }
+ errResults = append(errResults, result)
+ }
+ }
+ }
+ }
+
// ensure that no new database updates are made on old database
dbMux.Lock()
- SetDB(db)
- dbMux.Unlock()
+ defer dbMux.Unlock()
- // update deployments
- deps, err := getDeploymentsToUpdate(db)
- if err != nil {
- log.Panicf("Unable to getDeploymentsToUpdate: %v", err)
- }
tx, err := db.Begin()
if err != nil {
log.Panicf("Error starting transaction: %v", err)
}
defer tx.Rollback()
- err = updateDeploymentsColumns(tx, deps)
+
+ err = insertDeployments(tx, deploymentsToInsert)
if err != nil {
- log.Panicf("updateDeploymentsColumns failed: %v", err)
- }
- err = tx.Commit()
- if err != nil {
- log.Panicf("Error committing Snapshot update: %v", err)
+ log.Panicf("Error processing Snapshot: %v", err)
}
- startupOnExistingDatabase()
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error committing Snapshot change: %v", err)
+ }
+
+ SetDB(db)
+
+ for _, dep := range deploymentsToInsert {
+ queueDownloadRequest(dep)
+ }
+
+ // transmit parsing errors back immediately
+ if len(errResults) > 0 {
+ go transmitDeploymentResultsToServer(errResults)
+ }
+
+ // if no tables, this a startup event for an existing DB
+ if len(snapshot.Tables) == 0 {
+ startupOnExistingDatabase()
+ }
+
log.Debug("Snapshot processed")
}
@@ -126,8 +159,8 @@
func processChangeList(changes *common.ChangeList) {
- // changes have been applied to DB
- var insertedDeployments, deletedDeployments []DataDeployment
+ // gather deleted bundle info
+ var deploymentsToInsert, deploymentsToDelete []DataDeployment
var errResults apiDeploymentResults
for _, change := range changes.Changes {
switch change.Table {
@@ -136,7 +169,7 @@
case common.Insert:
dep, err := dataDeploymentFromRow(change.NewRow)
if err == nil {
- insertedDeployments = append(insertedDeployments, dep)
+ deploymentsToInsert = append(deploymentsToInsert, dep)
} else {
result := apiDeploymentResult{
ID: dep.ID,
@@ -155,7 +188,7 @@
ID: id,
DataScopeID: dataScopeID,
}
- deletedDeployments = append(deletedDeployments, dep)
+ deploymentsToDelete = append(deploymentsToDelete, dep)
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
@@ -167,23 +200,45 @@
go transmitDeploymentResultsToServer(errResults)
}
- for _, d := range deletedDeployments {
+ tx, err := getDB().Begin()
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+ defer tx.Rollback()
+
+ for _, dep := range deploymentsToDelete {
+ err = deleteDeployment(tx, dep.ID)
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+ }
+ err = insertDeployments(tx, deploymentsToInsert)
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ log.Panicf("Error processing ChangeList: %v", err)
+ }
+
+ for _, d := range deploymentsToDelete {
deploymentsChanged <- d.ID
}
log.Debug("ChangeList processed")
- for _, dep := range insertedDeployments {
+ for _, dep := range deploymentsToInsert {
queueDownloadRequest(dep)
}
// clean up old bundles
- if len(deletedDeployments) > 0 {
- log.Debugf("will delete %d old bundles", len(deletedDeployments))
+ if len(deploymentsToDelete) > 0 {
+ log.Debugf("will delete %d old bundles", len(deploymentsToDelete))
go func() {
// give clients a minute to avoid conflicts
time.Sleep(bundleCleanupDelay)
- for _, dep := range deletedDeployments {
+ for _, dep := range deploymentsToDelete {
bundleFile := getBundleFile(dep)
log.Debugf("removing old bundle: %v", bundleFile)
safeDelete(bundleFile)
diff --git a/listener_test.go b/listener_test.go
index c253eb3..0c79f3f 100644
--- a/listener_test.go
+++ b/listener_test.go
@@ -8,7 +8,6 @@
"net/http"
- "fmt"
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
. "github.com/onsi/ginkgo"
@@ -19,56 +18,116 @@
Context("ApigeeSync snapshot event", func() {
- /*
- * Note that the test snapshot should not be empty.
- * If it's empty, you can't use deploymentsResult chan to mark the end of processing,
- * so the threads generated by startupOnExistingDatabase() will mess up later tests
- */
- It("should set DB to appropriate version", func(done Done) {
- saveDB := getDB()
- deploymentID := "set_version_test"
- snapshot, dep := createSnapshotDeployment(deploymentID, "test_version")
+ It("should set DB and process", func(done Done) {
- db, err := data.DBVersion(snapshot.SnapshotInfo)
+ deploymentID := "listener_test_1"
+
+ uri, err := url.Parse(testServer.URL)
Expect(err).ShouldNot(HaveOccurred())
- err = InitDB(db)
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle1 := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle1)
Expect(err).ShouldNot(HaveOccurred())
- insertDeploymentToDb(dep, db)
- expectedDB, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ var event = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{
+ {
+ Name: DEPLOYMENT_TABLE,
+ Rows: []common.Row{row},
+ },
+ },
+ }
var listener = make(chan deploymentsResult)
addSubscriber <- listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot)
+ apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
+ Expect(result.err).ToNot(HaveOccurred())
- // DB should have been set
- Expect(getDB() == expectedDB).Should(BeTrue())
+ // from event
+ Expect(len(result.deployments)).To(Equal(1))
+ d := result.deployments[0]
- SetDB(saveDB)
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle1.Name))
+ Expect(d.BundleURI).To(Equal(bundle1.URI))
+
+ // from db
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(1))
+ d = deployments[0]
+
+ Expect(d.ID).To(Equal(deploymentID))
+ Expect(d.BundleName).To(Equal(bundle1.Name))
+ Expect(d.BundleURI).To(Equal(bundle1.URI))
+
close(done)
})
It("should process unready on existing db startup event", func(done Done) {
- saveDB := getDB()
-
deploymentID := "startup_test"
- snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready")
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+
+ dep := DataDeployment{
+ ID: deploymentID,
+ DataScopeID: deploymentID,
+ BundleURI: bundle.URI,
+ BundleChecksum: bundle.Checksum,
+ BundleChecksumType: bundle.ChecksumType,
+ }
+
+ // init without info == startup on existing DB
+ var snapshot = common.Snapshot{
+ SnapshotInfo: "test",
+ Tables: []common.Table{},
+ }
db, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).ShouldNot(HaveOccurred())
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
err = InitDB(db)
+ if err != nil {
+ log.Panicf("Unable to initialize database: %v", err)
+ }
+
+ tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
- insertDeploymentToDb(dep, db)
+ err = InsertDeployment(tx, dep)
+ Expect(err).ShouldNot(HaveOccurred())
+
+ err = tx.Commit()
+ Expect(err).ShouldNot(HaveOccurred())
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -82,15 +141,11 @@
d := result.deployments[0]
Expect(d.ID).To(Equal(deploymentID))
-
- SetDB(saveDB)
close(done)
})
It("should send deployment statuses on existing db startup event", func(done Done) {
- saveDB := getDB()
-
successDep := DataDeployment{
ID: "success",
LocalBundleURI: "x",
@@ -134,7 +189,6 @@
Message: failDep.DeployErrorMessage,
}))
- SetDB(saveDB)
close(done)
}))
@@ -149,10 +203,14 @@
}
db, err := data.DBVersion(snapshot.SnapshotInfo)
- Expect(err).NotTo(HaveOccurred())
+ if err != nil {
+ log.Panicf("Unable to access database: %v", err)
+ }
- err = InitDBFullColumns(db)
- Expect(err).NotTo(HaveOccurred())
+ err = InitDB(db)
+ if err != nil {
+ log.Panicf("Unable to initialize database: %v", err)
+ }
tx, err := db.Begin()
Expect(err).ShouldNot(HaveOccurred())
@@ -173,19 +231,37 @@
Context("ApigeeSync change event", func() {
- It("inserting event should deliver the deployment to subscribers", func(done Done) {
+ It("add event should add a deployment", func(done Done) {
deploymentID := "add_test_1"
- event, dep := createChangeDeployment(deploymentID)
+ uri, err := url.Parse(testServer.URL)
+ Expect(err).ShouldNot(HaveOccurred())
- // insert full deployment columns
- tx, err := getDB().Begin()
+ uri.Path = "/bundles/1"
+ bundleUri := uri.String()
+ bundle := bundleConfigJson{
+ Name: uri.Path,
+ URI: bundleUri,
+ ChecksumType: "crc32",
+ }
+ bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
+ bundle1Json, err := json.Marshal(bundle)
Expect(err).ShouldNot(HaveOccurred())
- err = InsertDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
+
+ row := common.Row{}
+ row["id"] = &common.ColumnVal{Value: deploymentID}
+ row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
+
+ var event = common.ChangeList{
+ Changes: []common.Change{
+ {
+ Operation: common.Insert,
+ Table: DEPLOYMENT_TABLE,
+ NewRow: row,
+ },
+ },
+ }
var listener = make(chan deploymentsResult)
addSubscriber <- listener
@@ -203,39 +279,31 @@
d := deployments[0]
Expect(d.ID).To(Equal(deploymentID))
- Expect(d.BundleName).To(Equal(dep.BundleName))
- Expect(d.BundleURI).To(Equal(dep.BundleURI))
+ Expect(d.BundleName).To(Equal(bundle.Name))
+ Expect(d.BundleURI).To(Equal(bundle.URI))
close(done)
})
- It("delete event should deliver to subscribers", func(done Done) {
+ It("delete event should delete a deployment", func(done Done) {
deploymentID := "delete_test_1"
- // insert deployment
- event, dep := createChangeDeployment(deploymentID)
-
- // insert full deployment columns
tx, err := getDB().Begin()
Expect(err).ShouldNot(HaveOccurred())
+ dep := DataDeployment{
+ ID: deploymentID,
+ LocalBundleURI: "whatever",
+ }
err = InsertDeployment(tx, dep)
Expect(err).ShouldNot(HaveOccurred())
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())
- listener := make(chan deploymentsResult)
- addSubscriber <- listener
- apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- // wait for event to propagate
- result := <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
-
- // delete deployment
- deletDeploymentFromDb(dep, getDB())
row := common.Row{}
row["id"] = &common.ColumnVal{Value: deploymentID}
- event = common.ChangeList{
+
+ var event = common.ChangeList{
Changes: []common.Change{
{
Operation: common.Delete,
@@ -245,99 +313,19 @@
},
}
- listener = make(chan deploymentsResult)
+ var listener = make(chan deploymentsResult)
addSubscriber <- listener
+
apid.Events().Emit(APIGEE_SYNC_EVENT, &event)
- result = <-listener
- Expect(result.err).ShouldNot(HaveOccurred())
- Expect(len(result.deployments)).To(Equal(0))
+
+ <-listener
+
+ deployments, err := getReadyDeployments()
+ Expect(err).ShouldNot(HaveOccurred())
+
+ Expect(len(deployments)).To(Equal(0))
+
close(done)
})
})
})
-
-func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) {
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
- bundle1Json, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
-
- row := common.Row{}
- row["id"] = &common.ColumnVal{Value: deploymentID}
- row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)}
-
- changeList := common.ChangeList{
- Changes: []common.Change{
- {
- Operation: common.Insert,
- Table: DEPLOYMENT_TABLE,
- NewRow: row,
- },
- },
- }
- dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow)
- return changeList, dep
-}
-
-func insertDeploymentToDb(dep DataDeployment, db apid.DB) {
- tx, err := db.Begin()
- Expect(err).ShouldNot(HaveOccurred())
- defer tx.Rollback()
- err = InsertTestDeployment(tx, dep)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
-func deletDeploymentFromDb(dep DataDeployment, db apid.DB) {
- tx, err := db.Begin()
- Expect(err).ShouldNot(HaveOccurred())
- defer tx.Rollback()
- err = deleteDeployment(tx, dep.ID)
- Expect(err).ShouldNot(HaveOccurred())
- err = tx.Commit()
- Expect(err).ShouldNot(HaveOccurred())
-}
-
-func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) {
- uri, err := url.Parse(testServer.URL)
- Expect(err).ShouldNot(HaveOccurred())
-
- uri.Path = "/bundles/1"
- bundleUri := uri.String()
- bundle := bundleConfigJson{
- Name: uri.Path,
- URI: bundleUri,
- ChecksumType: "crc32",
- }
- bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri)
-
- jsonBytes, err := json.Marshal(bundle)
- Expect(err).ShouldNot(HaveOccurred())
- fmt.Println("JSON :" + string(jsonBytes))
-
- dep := DataDeployment{
- ID: deploymentID,
- DataScopeID: deploymentID,
- BundleURI: bundle.URI,
- BundleChecksum: bundle.Checksum,
- BundleChecksumType: bundle.ChecksumType,
- BundleConfigJSON: string(jsonBytes),
- }
-
- // init without info == startup on existing DB
- var snapshot = common.Snapshot{
- SnapshotInfo: snapInfo,
- Tables: []common.Table{},
- }
- return snapshot, dep
-}