Revert "Xapid 941"
diff --git a/.gitignore b/.gitignore index 0df4220..9d01395 100644 --- a/.gitignore +++ b/.gitignore
@@ -5,5 +5,3 @@ cmd/apidGatewayDeploy/apidGatewayDeploy *.lock /apidGatewayDeploy.iml -/data.go -.idea \ No newline at end of file
diff --git a/api.go b/api.go index 8a628e7..972b8eb 100644 --- a/api.go +++ b/api.go
@@ -32,9 +32,8 @@ ) const ( - sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST" - iso8601 = "2006-01-02T15:04:05.999Z07:00" - sqliteTimeFormat = "2006-01-02 15:04:05.999-07:00" + sqlTimeFormat = "2006-01-02 15:04:05.999 -0700 MST" + iso8601 = "2006-01-02T15:04:05.999Z07:00" ) type deploymentsResult struct { @@ -393,7 +392,7 @@ if t == "" { return "" } - formats := []string{sqliteTimeFormat, sqlTimeFormat, iso8601, time.RFC3339} + formats := []string{sqlTimeFormat, iso8601, time.RFC3339} for _, f := range formats { timestamp, err := time.Parse(f, t) if err == nil {
diff --git a/api_test.go b/api_test.go index a49b9a5..c6acd5e 100644 --- a/api_test.go +++ b/api_test.go
@@ -295,7 +295,7 @@ Expect(resp.StatusCode).Should(Equal(http.StatusOK)) var deployStatus string - err = db.QueryRow("SELECT deploy_status FROM edgex_deployment WHERE id=?", deploymentID). + err = db.QueryRow("SELECT deploy_status FROM deployments WHERE id=?", deploymentID). Scan(&deployStatus) Expect(deployStatus).Should(Equal(RESPONSE_STATUS_SUCCESS)) }) @@ -332,7 +332,7 @@ var deploy_error_code int err = db.QueryRow(` SELECT deploy_status, deploy_error_code, deploy_error_message - FROM edgex_deployment + FROM deployments WHERE id=?`, deploymentID).Scan(&deployStatus, &deploy_error_code, &deploy_error_message) Expect(deployStatus).Should(Equal(RESPONSE_STATUS_FAIL)) Expect(deploy_error_code).Should(Equal(100)) @@ -364,8 +364,8 @@ }) It("should get iso8601 time", func() { - testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05 23:23:38.162+00:00"} - isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T23:23:38.162Z"} + testTimes := []string{"", "2017-04-05 04:47:36.462 +0000 UTC", "2017-04-05 04:47:36.462 -0700 MST", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"} + isoTime := []string{"", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00", "2017-04-05T04:47:36.462Z", "2017-04-05T04:47:36.462-07:00"} for i, t := range testTimes { log.Debug("insert deployment with timestamp: " + t) deploymentID := "api_time_iso8601_" + strconv.Itoa(i)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 236fe51..3b55d05 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -42,10 +42,9 @@ apid.InitializePlugins("") - // init full DB db, err := data.DB() Expect(err).NotTo(HaveOccurred()) - err = InitDBFullColumns(db) + err = InitDB(db) Expect(err).NotTo(HaveOccurred()) SetDB(db) @@ -115,9 +114,8 @@ apiServerBaseURI, err = url.Parse(testServer.URL) Expect(err).NotTo(HaveOccurred()) - _, err = getDB().Exec("DELETE FROM edgex_deployment") + _, err = getDB().Exec("DELETE FROM deployments") Expect(err).ShouldNot(HaveOccurred()) - _, err = getDB().Exec("UPDATE etag SET value=1") })
diff --git a/data.go b/data.go index c2b3ecf..ad67728 100644 --- a/data.go +++ b/data.go
@@ -5,9 +5,7 @@ "fmt" "sync" - "encoding/json" "github.com/30x/apid-core" - "strings" ) var ( @@ -40,9 +38,9 @@ Exec(query string, args ...interface{}) (sql.Result, error) } -func InitDBFullColumns(db apid.DB) error { +func InitDB(db apid.DB) error { _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS edgex_deployment ( + CREATE TABLE IF NOT EXISTS deployments ( id character varying(36) NOT NULL, bundle_config_id varchar(36) NOT NULL, apid_cluster_id varchar(36) NOT NULL, @@ -53,7 +51,7 @@ created_by text, updated timestamp without time zone, updated_by text, - bundle_config_name text, + bundle_name text, bundle_uri text, local_bundle_uri text, bundle_checksum text, @@ -72,56 +70,6 @@ return nil } -func InitDB(db apid.DB) error { - _, err := db.Exec(` - CREATE TABLE IF NOT EXISTS edgex_deployment ( - id character varying(36) NOT NULL, - bundle_config_id varchar(36) NOT NULL, - apid_cluster_id varchar(36) NOT NULL, - data_scope_id varchar(36) NOT NULL, - bundle_config_json text NOT NULL, - config_json text NOT NULL, - created timestamp without time zone, - created_by text, - updated timestamp without time zone, - updated_by text, - bundle_config_name text, - PRIMARY KEY (id) - ); - `) - if err != nil { - return err - } - - log.Debug("Database tables created.") - return nil -} - -func alterTable(db apid.DB) error { - queries := []string{ - "ALTER TABLE edgex_deployment ADD COLUMN bundle_uri text;", - "ALTER TABLE edgex_deployment ADD COLUMN local_bundle_uri text;", - "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum text;", - "ALTER TABLE edgex_deployment ADD COLUMN bundle_checksum_type text;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_status string;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_code int;", - "ALTER TABLE edgex_deployment ADD COLUMN deploy_error_message text;", - } - - for _, query := range queries { - _, err := db.Exec(query) - if err != nil { - if strings.Contains(err.Error(), "duplicate column name") { - log.Warnf("AlterTable warning: %s", err) - } else { - return err - } - } - } - log.Debug("Database table altered.") - return nil -} - func getDB() apid.DB { dbMux.RLock() db := unsafeDB @@ -143,19 +91,19 @@ func insertDeployments(tx *sql.Tx, deps []DataDeployment) error { - log.Debugf("inserting %d edgex_deployment", len(deps)) + log.Debugf("inserting %d deployments", len(deps)) stmt, err := tx.Prepare(` - INSERT INTO edgex_deployment + INSERT INTO deployments (id, bundle_config_id, apid_cluster_id, data_scope_id, bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name, bundle_uri, local_bundle_uri, + updated, updated_by, bundle_name, bundle_uri, local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, deploy_error_code, deploy_error_message) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16,$17,$18); `) if err != nil { - log.Errorf("prepare insert into edgex_deployment failed: %v", err) + log.Errorf("prepare insert into deployments failed: %v", err) return err } defer stmt.Close() @@ -170,87 +118,29 @@ dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, dep.DeployErrorCode, dep.DeployErrorMessage) if err != nil { - log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err) + log.Errorf("insert into deployments %s failed: %v", dep.ID, err) return err } } - log.Debug("inserting edgex_deployment succeeded") + log.Debug("inserting deployments succeeded") return err } -func updateDeploymentsColumns(tx *sql.Tx, deps []DataDeployment) error { - - log.Debugf("updating %d edgex_deployment", len(deps)) - - stmt, err := tx.Prepare(` - UPDATE edgex_deployment SET - (bundle_uri, local_bundle_uri, - bundle_checksum, bundle_checksum_type, deploy_status, - deploy_error_code, deploy_error_message) - = ($1,$2,$3,$4,$5,$6,$7) WHERE id = $8 - `) - if err != nil { - log.Errorf("prepare update edgex_deployment failed: %v", err) - return err - } - defer stmt.Close() - - for _, dep := range deps { - log.Debugf("updateDeploymentsColumns: processing deployment %s, %v", dep.ID, dep.BundleURI) - - _, err = stmt.Exec( - dep.BundleURI, - dep.LocalBundleURI, dep.BundleChecksum, dep.BundleChecksumType, dep.DeployStatus, - dep.DeployErrorCode, dep.DeployErrorMessage, dep.ID) - if err != nil { - log.Errorf("updateDeploymentsColumns of edgex_deployment %s failed: %v", dep.ID, err) - return err - } - } - - log.Debug("updateDeploymentsColumns of edgex_deployment succeeded") - return err -} - -func getDeploymentsToUpdate(db apid.DB) (deployments []DataDeployment, err error) { - deployments, err = getDeployments("WHERE bundle_uri IS NULL AND local_bundle_uri IS NULL AND deploy_status IS NULL") - if err != nil { - log.Errorf("getDeployments in getDeploymentsToUpdate failed: %v", err) - return - } - var bc bundleConfigJson - for i, _ := range deployments { - log.Debugf("getDeploymentsToUpdate: processing deployment %v, %v", deployments[i].ID, deployments[i].BundleConfigJSON) - json.Unmarshal([]byte(deployments[i].BundleConfigJSON), &bc) - if err != nil { - log.Errorf("JSON decoding Manifest failed: %v", err) - return - } - deployments[i].BundleName = bc.Name - deployments[i].BundleURI = bc.URI - deployments[i].BundleChecksumType = bc.ChecksumType - deployments[i].BundleChecksum = bc.Checksum - - log.Debugf("Unmarshal: %v", deployments[i].BundleURI) - } - return -} - func deleteDeployment(tx *sql.Tx, depID string) error { log.Debugf("deleteDeployment: %s", depID) - stmt, err := tx.Prepare("DELETE FROM edgex_deployment where id = $1;") + stmt, err := tx.Prepare("DELETE FROM deployments where id = $1;") if err != nil { - log.Errorf("prepare delete from edgex_deployment %s failed: %v", depID, err) + log.Errorf("prepare delete from deployments %s failed: %v", depID, err) return err } defer stmt.Close() _, err = stmt.Exec(depID) if err != nil { - log.Errorf("delete from edgex_deployment %s failed: %v", depID, err) + log.Errorf("delete from deployments %s failed: %v", depID, err) return err } @@ -276,10 +166,10 @@ stmt, err = db.Prepare(` SELECT id, bundle_config_id, apid_cluster_id, data_scope_id, bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name, bundle_uri, + updated, updated_by, bundle_name, bundle_uri, local_bundle_uri, bundle_checksum, bundle_checksum_type, deploy_status, deploy_error_code, deploy_error_message - FROM edgex_deployment + FROM deployments ` + where) if err != nil { return @@ -291,7 +181,7 @@ if err == sql.ErrNoRows { return } - log.Errorf("Error querying edgex_deployment: %v", err) + log.Errorf("Error querying deployments: %v", err) return } defer rows.Close() @@ -330,7 +220,7 @@ defer tx.Rollback() stmt, err := tx.Prepare(` - UPDATE edgex_deployment + UPDATE deployments SET deploy_status=$1, deploy_error_code=$2, deploy_error_message=$3 WHERE id=$4; `) @@ -343,7 +233,7 @@ for _, result := range results { res, err := stmt.Exec(result.Status, result.ErrorCode, result.Message, result.ID) if err != nil { - log.Errorf("update edgex_deployment %s to %s failed: %v", result.ID, result.Status, err) + log.Errorf("update deployments %s to %s failed: %v", result.ID, result.Status, err) return err } n, err := res.RowsAffected() @@ -361,7 +251,7 @@ func updateLocalBundleURI(depID, localBundleUri string) error { - stmt, err := getDB().Prepare("UPDATE edgex_deployment SET local_bundle_uri=$1 WHERE id=$2;") + stmt, err := getDB().Prepare("UPDATE deployments SET local_bundle_uri=$1 WHERE id=$2;") if err != nil { log.Errorf("prepare updateLocalBundleURI failed: %v", err) return err @@ -370,41 +260,11 @@ _, err = stmt.Exec(localBundleUri, depID) if err != nil { - log.Errorf("update edgex_deployment %s localBundleUri to %s failed: %v", depID, localBundleUri, err) + log.Errorf("update deployments %s localBundleUri to %s failed: %v", depID, localBundleUri, err) return err } - log.Debugf("update edgex_deployment %s localBundleUri to %s succeeded", depID, localBundleUri) + log.Debugf("update deployments %s localBundleUri to %s succeeded", depID, localBundleUri) return nil } - -func InsertTestDeployment(tx *sql.Tx, dep DataDeployment) error { - - stmt, err := tx.Prepare(` - INSERT INTO edgex_deployment - (id, bundle_config_id, apid_cluster_id, data_scope_id, - bundle_config_json, config_json, created, created_by, - updated, updated_by, bundle_config_name) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11); - `) - if err != nil { - log.Errorf("prepare insert into edgex_deployment failed: %v", err) - return err - } - defer stmt.Close() - - log.Debugf("InsertTestDeployment: %s", dep.ID) - - _, err = stmt.Exec( - dep.ID, dep.BundleConfigID, dep.ApidClusterID, dep.DataScopeID, - dep.BundleConfigJSON, dep.ConfigJSON, dep.Created, dep.CreatedBy, - dep.Updated, dep.UpdatedBy, dep.BundleName) - if err != nil { - log.Errorf("insert into edgex_deployment %s failed: %v", dep.ID, err) - return err - } - - log.Debug("InsertTestDeployment edgex_deployment succeeded") - return err -}
diff --git a/listener.go b/listener.go index 8a4f017..8230176 100644 --- a/listener.go +++ b/listener.go
@@ -54,36 +54,69 @@ log.Panicf("Unable to access database: %v", err) } - // alter table - err = alterTable(db) + err = InitDB(db) if err != nil { - log.Panicf("Alter table failed: %v", err) + log.Panicf("Unable to initialize database: %v", err) } + + var deploymentsToInsert []DataDeployment + var errResults apiDeploymentResults + for _, table := range snapshot.Tables { + switch table.Name { + case DEPLOYMENT_TABLE: + for _, row := range table.Rows { + dep, err := dataDeploymentFromRow(row) + if err == nil { + deploymentsToInsert = append(deploymentsToInsert, dep) + } else { + result := apiDeploymentResult{ + ID: dep.ID, + Status: RESPONSE_STATUS_FAIL, + ErrorCode: TRACKER_ERR_DEPLOYMENT_BAD_JSON, + Message: fmt.Sprintf("unable to parse deployment: %v", err), + } + errResults = append(errResults, result) + } + } + } + } + // ensure that no new database updates are made on old database dbMux.Lock() - SetDB(db) - dbMux.Unlock() + defer dbMux.Unlock() - // update deployments - deps, err := getDeploymentsToUpdate(db) - if err != nil { - log.Panicf("Unable to getDeploymentsToUpdate: %v", err) - } tx, err := db.Begin() if err != nil { log.Panicf("Error starting transaction: %v", err) } defer tx.Rollback() - err = updateDeploymentsColumns(tx, deps) + + err = insertDeployments(tx, deploymentsToInsert) if err != nil { - log.Panicf("updateDeploymentsColumns failed: %v", err) - } - err = tx.Commit() - if err != nil { - log.Panicf("Error committing Snapshot update: %v", err) + log.Panicf("Error processing Snapshot: %v", err) } - startupOnExistingDatabase() + err = tx.Commit() + if err != nil { + log.Panicf("Error committing Snapshot change: %v", err) + } + + SetDB(db) + + for _, dep := range deploymentsToInsert { + queueDownloadRequest(dep) + } + + // transmit parsing errors back immediately + if len(errResults) > 0 { + go transmitDeploymentResultsToServer(errResults) + } + + // if no tables, this a startup event for an existing DB + if len(snapshot.Tables) == 0 { + startupOnExistingDatabase() + } + log.Debug("Snapshot processed") } @@ -126,8 +159,8 @@ func processChangeList(changes *common.ChangeList) { - // changes have been applied to DB - var insertedDeployments, deletedDeployments []DataDeployment + // gather deleted bundle info + var deploymentsToInsert, deploymentsToDelete []DataDeployment var errResults apiDeploymentResults for _, change := range changes.Changes { switch change.Table { @@ -136,7 +169,7 @@ case common.Insert: dep, err := dataDeploymentFromRow(change.NewRow) if err == nil { - insertedDeployments = append(insertedDeployments, dep) + deploymentsToInsert = append(deploymentsToInsert, dep) } else { result := apiDeploymentResult{ ID: dep.ID, @@ -155,7 +188,7 @@ ID: id, DataScopeID: dataScopeID, } - deletedDeployments = append(deletedDeployments, dep) + deploymentsToDelete = append(deploymentsToDelete, dep) default: log.Errorf("unexpected operation: %s", change.Operation) } @@ -167,23 +200,45 @@ go transmitDeploymentResultsToServer(errResults) } - for _, d := range deletedDeployments { + tx, err := getDB().Begin() + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } + defer tx.Rollback() + + for _, dep := range deploymentsToDelete { + err = deleteDeployment(tx, dep.ID) + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } + } + err = insertDeployments(tx, deploymentsToInsert) + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } + + err = tx.Commit() + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } + + for _, d := range deploymentsToDelete { deploymentsChanged <- d.ID } log.Debug("ChangeList processed") - for _, dep := range insertedDeployments { + for _, dep := range deploymentsToInsert { queueDownloadRequest(dep) } // clean up old bundles - if len(deletedDeployments) > 0 { - log.Debugf("will delete %d old bundles", len(deletedDeployments)) + if len(deploymentsToDelete) > 0 { + log.Debugf("will delete %d old bundles", len(deploymentsToDelete)) go func() { // give clients a minute to avoid conflicts time.Sleep(bundleCleanupDelay) - for _, dep := range deletedDeployments { + for _, dep := range deploymentsToDelete { bundleFile := getBundleFile(dep) log.Debugf("removing old bundle: %v", bundleFile) safeDelete(bundleFile)
diff --git a/listener_test.go b/listener_test.go index c253eb3..0c79f3f 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -8,7 +8,6 @@ "net/http" - "fmt" "github.com/30x/apid-core" "github.com/apigee-labs/transicator/common" . "github.com/onsi/ginkgo" @@ -19,56 +18,116 @@ Context("ApigeeSync snapshot event", func() { - /* - * Note that the test snapshot should not be empty. - * If it's empty, you can't use deploymentsResult chan to mark the end of processing, - * so the threads generated by startupOnExistingDatabase() will mess up later tests - */ - It("should set DB to appropriate version", func(done Done) { - saveDB := getDB() - deploymentID := "set_version_test" - snapshot, dep := createSnapshotDeployment(deploymentID, "test_version") + It("should set DB and process", func(done Done) { - db, err := data.DBVersion(snapshot.SnapshotInfo) + deploymentID := "listener_test_1" + + uri, err := url.Parse(testServer.URL) Expect(err).ShouldNot(HaveOccurred()) - err = InitDB(db) + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle1 := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc32", + } + bundle1.Checksum = testGetChecksum(bundle1.ChecksumType, bundleUri) + bundle1Json, err := json.Marshal(bundle1) Expect(err).ShouldNot(HaveOccurred()) - insertDeploymentToDb(dep, db) - expectedDB, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} + + var event = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{ + { + Name: DEPLOYMENT_TABLE, + Rows: []common.Row{row}, + }, + }, + } var listener = make(chan deploymentsResult) addSubscriber <- listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &snapshot) + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) + Expect(result.err).ToNot(HaveOccurred()) - // DB should have been set - Expect(getDB() == expectedDB).Should(BeTrue()) + // from event + Expect(len(result.deployments)).To(Equal(1)) + d := result.deployments[0] - SetDB(saveDB) + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.BundleName).To(Equal(bundle1.Name)) + Expect(d.BundleURI).To(Equal(bundle1.URI)) + + // from db + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(1)) + d = deployments[0] + + Expect(d.ID).To(Equal(deploymentID)) + Expect(d.BundleName).To(Equal(bundle1.Name)) + Expect(d.BundleURI).To(Equal(bundle1.URI)) + close(done) }) It("should process unready on existing db startup event", func(done Done) { - saveDB := getDB() - deploymentID := "startup_test" - snapshot, dep := createSnapshotDeployment(deploymentID, "test_unready") + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + + dep := DataDeployment{ + ID: deploymentID, + DataScopeID: deploymentID, + BundleURI: bundle.URI, + BundleChecksum: bundle.Checksum, + BundleChecksumType: bundle.ChecksumType, + } + + // init without info == startup on existing DB + var snapshot = common.Snapshot{ + SnapshotInfo: "test", + Tables: []common.Table{}, + } db, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).ShouldNot(HaveOccurred()) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } err = InitDB(db) + if err != nil { + log.Panicf("Unable to initialize database: %v", err) + } + + tx, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) - insertDeploymentToDb(dep, db) + err = InsertDeployment(tx, dep) + Expect(err).ShouldNot(HaveOccurred()) + + err = tx.Commit() + Expect(err).ShouldNot(HaveOccurred()) var listener = make(chan deploymentsResult) addSubscriber <- listener @@ -82,15 +141,11 @@ d := result.deployments[0] Expect(d.ID).To(Equal(deploymentID)) - - SetDB(saveDB) close(done) }) It("should send deployment statuses on existing db startup event", func(done Done) { - saveDB := getDB() - successDep := DataDeployment{ ID: "success", LocalBundleURI: "x", @@ -134,7 +189,6 @@ Message: failDep.DeployErrorMessage, })) - SetDB(saveDB) close(done) })) @@ -149,10 +203,14 @@ } db, err := data.DBVersion(snapshot.SnapshotInfo) - Expect(err).NotTo(HaveOccurred()) + if err != nil { + log.Panicf("Unable to access database: %v", err) + } - err = InitDBFullColumns(db) - Expect(err).NotTo(HaveOccurred()) + err = InitDB(db) + if err != nil { + log.Panicf("Unable to initialize database: %v", err) + } tx, err := db.Begin() Expect(err).ShouldNot(HaveOccurred()) @@ -173,19 +231,37 @@ Context("ApigeeSync change event", func() { - It("inserting event should deliver the deployment to subscribers", func(done Done) { + It("add event should add a deployment", func(done Done) { deploymentID := "add_test_1" - event, dep := createChangeDeployment(deploymentID) + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) - // insert full deployment columns - tx, err := getDB().Begin() + uri.Path = "/bundles/1" + bundleUri := uri.String() + bundle := bundleConfigJson{ + Name: uri.Path, + URI: bundleUri, + ChecksumType: "crc32", + } + bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) + bundle1Json, err := json.Marshal(bundle) Expect(err).ShouldNot(HaveOccurred()) - err = InsertDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} + + var event = common.ChangeList{ + Changes: []common.Change{ + { + Operation: common.Insert, + Table: DEPLOYMENT_TABLE, + NewRow: row, + }, + }, + } var listener = make(chan deploymentsResult) addSubscriber <- listener @@ -203,39 +279,31 @@ d := deployments[0] Expect(d.ID).To(Equal(deploymentID)) - Expect(d.BundleName).To(Equal(dep.BundleName)) - Expect(d.BundleURI).To(Equal(dep.BundleURI)) + Expect(d.BundleName).To(Equal(bundle.Name)) + Expect(d.BundleURI).To(Equal(bundle.URI)) close(done) }) - It("delete event should deliver to subscribers", func(done Done) { + It("delete event should delete a deployment", func(done Done) { deploymentID := "delete_test_1" - // insert deployment - event, dep := createChangeDeployment(deploymentID) - - // insert full deployment columns tx, err := getDB().Begin() Expect(err).ShouldNot(HaveOccurred()) + dep := DataDeployment{ + ID: deploymentID, + LocalBundleURI: "whatever", + } err = InsertDeployment(tx, dep) Expect(err).ShouldNot(HaveOccurred()) err = tx.Commit() Expect(err).ShouldNot(HaveOccurred()) - listener := make(chan deploymentsResult) - addSubscriber <- listener - apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - // wait for event to propagate - result := <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - - // delete deployment - deletDeploymentFromDb(dep, getDB()) row := common.Row{} row["id"] = &common.ColumnVal{Value: deploymentID} - event = common.ChangeList{ + + var event = common.ChangeList{ Changes: []common.Change{ { Operation: common.Delete, @@ -245,99 +313,19 @@ }, } - listener = make(chan deploymentsResult) + var listener = make(chan deploymentsResult) addSubscriber <- listener + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) - result = <-listener - Expect(result.err).ShouldNot(HaveOccurred()) - Expect(len(result.deployments)).To(Equal(0)) + + <-listener + + deployments, err := getReadyDeployments() + Expect(err).ShouldNot(HaveOccurred()) + + Expect(len(deployments)).To(Equal(0)) + close(done) }) }) }) - -func createChangeDeployment(deploymentID string) (common.ChangeList, DataDeployment) { - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - bundle1Json, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - - row := common.Row{} - row["id"] = &common.ColumnVal{Value: deploymentID} - row["bundle_config_json"] = &common.ColumnVal{Value: string(bundle1Json)} - - changeList := common.ChangeList{ - Changes: []common.Change{ - { - Operation: common.Insert, - Table: DEPLOYMENT_TABLE, - NewRow: row, - }, - }, - } - dep, err := dataDeploymentFromRow(changeList.Changes[0].NewRow) - return changeList, dep -} - -func insertDeploymentToDb(dep DataDeployment, db apid.DB) { - tx, err := db.Begin() - Expect(err).ShouldNot(HaveOccurred()) - defer tx.Rollback() - err = InsertTestDeployment(tx, dep) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -} - -func deletDeploymentFromDb(dep DataDeployment, db apid.DB) { - tx, err := db.Begin() - Expect(err).ShouldNot(HaveOccurred()) - defer tx.Rollback() - err = deleteDeployment(tx, dep.ID) - Expect(err).ShouldNot(HaveOccurred()) - err = tx.Commit() - Expect(err).ShouldNot(HaveOccurred()) -} - -func createSnapshotDeployment(deploymentID string, snapInfo string) (common.Snapshot, DataDeployment) { - uri, err := url.Parse(testServer.URL) - Expect(err).ShouldNot(HaveOccurred()) - - uri.Path = "/bundles/1" - bundleUri := uri.String() - bundle := bundleConfigJson{ - Name: uri.Path, - URI: bundleUri, - ChecksumType: "crc32", - } - bundle.Checksum = testGetChecksum(bundle.ChecksumType, bundleUri) - - jsonBytes, err := json.Marshal(bundle) - Expect(err).ShouldNot(HaveOccurred()) - fmt.Println("JSON :" + string(jsonBytes)) - - dep := DataDeployment{ - ID: deploymentID, - DataScopeID: deploymentID, - BundleURI: bundle.URI, - BundleChecksum: bundle.Checksum, - BundleChecksumType: bundle.ChecksumType, - BundleConfigJSON: string(jsonBytes), - } - - // init without info == startup on existing DB - var snapshot = common.Snapshot{ - SnapshotInfo: snapInfo, - Tables: []common.Table{}, - } - return snapshot, dep -}