rework for transicator messaging
diff --git a/api_test.go b/api_test.go index 142cd05..9e31afa 100644 --- a/api_test.go +++ b/api_test.go
@@ -102,6 +102,8 @@ deploymentID = "api_get_current_blocking2" go func() { + defer GinkgoRecover() + query := uri.Query() query.Add("block", "1") uri.RawQuery = query.Encode() @@ -264,19 +266,23 @@ uri.Path = "/bundle" bundleUri := uri.String() - manifest := bundleManifest{ - systemBundle{ - bundleUri, + dep := deployment{ + DeploymentId: depID, + System: bundle{ + URI: bundleUri, }, - []dependantBundle{ + Bundles: []bundle{ { - bundleUri, - "some-scope", + BundleId: "bun", + URI: bundleUri, + Scope: "some-scope", + Org: "org", + Env: "env", }, }, } - err = insertDeployment(depID, manifest) + err = insertDeployment(depID, dep) Expect(err).ShouldNot(HaveOccurred()) err = updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_READY, 0)
diff --git a/apidGatewayDeploy_suite_test.go b/apidGatewayDeploy_suite_test.go index 34d8c40..2bd9148 100644 --- a/apidGatewayDeploy_suite_test.go +++ b/apidGatewayDeploy_suite_test.go
@@ -23,12 +23,12 @@ config := apid.Config() - // todo: This will change after apidApigeeSync is fixed for scopes config.SetDefault("apigeesync_proxy_server_base", "X") - config.SetDefault("apigeesync_organization", "X") config.SetDefault("apigeesync_consumer_key", "X") config.SetDefault("apigeesync_consumer_secret", "X") - config.SetDefault("apigeesync_log_level", "panic") + config.SetDefault("apigeesync_snapshot_server_base", "X") + config.SetDefault("apigeesync_change_server_base", "X") + config.SetDefault("apigeesync_log_level", "info") var err error tmpDir, err = ioutil.TempDir("", "api_test")
diff --git a/cmd/apidGatewayDeploy/main.go b/cmd/apidGatewayDeploy/main.go index a8b11ef..ee93279 100644 --- a/cmd/apidGatewayDeploy/main.go +++ b/cmd/apidGatewayDeploy/main.go
@@ -4,10 +4,10 @@ "flag" "github.com/30x/apid" "github.com/30x/apid/factory" - "github.com/30x/apidApigeeSync" _ "github.com/30x/apidGatewayDeploy" "io/ioutil" - "time" + "github.com/30x/transicator/common" + "github.com/30x/apidGatewayDeploy" ) func main() { @@ -23,13 +23,6 @@ config := apid.Config() - // todo: This will change after apidApigeeSync is fixed for scopes - config.SetDefault("apigeesync_proxy_server_base", "X") - config.SetDefault("apigeesync_organization", "X") - config.SetDefault("apigeesync_consumer_key", "X") - config.SetDefault("apigeesync_consumer_secret", "X") - config.SetDefault("apigeesync_log_level", "panic") - // if manifest is specified, start with only the manifest using a temp dir var manifest []byte if manifestFile != "" { @@ -71,20 +64,17 @@ func insertTestRecord(manifest []byte) { - now := time.Now().Unix() - var event = apidApigeeSync.ChangeSet{} - event.Changes = []apidApigeeSync.ChangePayload{ + row := common.Row{} + row["id"] = &common.ColumnVal{Value: "deploymentID"} + row["body"] = &common.ColumnVal{Value: string(manifest)} + + var event = common.Snapshot{} + event.Tables = []common.Table{ { - Data: apidApigeeSync.DataPayload{ - EntityType: "deployment", - Operation: "create", - EntityIdentifier: "entityID", - PldCont: apidApigeeSync.Payload{ - CreatedAt: now, - Manifest: string(manifest), - }, - }, + Name: apiGatewayDeploy.MANIFEST_TABLE, + Rows: []common.Row{row}, }, } - apid.Events().Emit(apidApigeeSync.ApigeeSyncEventSelector, &event) + + apid.Events().Emit(apiGatewayDeploy.APIGEE_SYNC_EVENT, &event) }
diff --git a/data.go b/data.go index c3fb74b..69dda23 100644 --- a/data.go +++ b/data.go
@@ -2,7 +2,6 @@ import ( "database/sql" - "github.com/30x/apidApigeeSync" "time" ) @@ -93,14 +92,20 @@ } // currently only maintains 1 in the queue -func queueDeployment(payload apidApigeeSync.DataPayload) error { +func queueDeployment(deploymentID, manifestString string) error { - // todo: validate payload manifest + log.Debugf("queuing deployment %s: %s", deploymentID, manifestString) + + // validate manifest + _, err := parseManifest(manifestString) + if err != nil { + return err + } // maintains queue at 1 tx, err := db.Begin() if err != nil { - log.Debugf("INSERT gateway_deploy_queue failed: (%s)", payload.EntityIdentifier) + log.Debugf("INSERT gateway_deploy_queue failed: (%s)", deploymentID) return err } defer tx.Rollback() @@ -112,22 +117,22 @@ } _, err = tx.Exec("INSERT INTO gateway_deploy_queue (id, manifest, created_at) VALUES (?,?,?);", - payload.EntityIdentifier, - payload.PldCont.Manifest, - payload.PldCont.CreatedAt, + deploymentID, + manifestString, + dbTimeNow(), ) if err != nil { - log.Errorf("INSERT gateway_deploy_queue %s failed: %v", payload.EntityIdentifier, err) + log.Errorf("INSERT gateway_deploy_queue %s failed: %v", deploymentID, err) return err } err = tx.Commit() if err != nil { - log.Errorf("INSERT gateway_deploy_queue %s failed: %v", payload.EntityIdentifier, err) + log.Errorf("INSERT gateway_deploy_queue %s failed: %v", deploymentID, err) return err } - log.Debugf("INSERT gateway_deploy_queue success: (%s)", payload.EntityIdentifier) + log.Debugf("INSERT gateway_deploy_queue success: (%s)", deploymentID) return nil } @@ -155,7 +160,7 @@ return int64(time.Now().UnixNano()) } -func insertDeployment(depID string, manifest bundleManifest) error { +func insertDeployment(depID string, dep deployment) error { tx, err := db.Begin() if err != nil { @@ -179,21 +184,20 @@ _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + "(id, deployment_id, type, uri, status, created_at) " + "VALUES(?,?,?,?,?,?);", - "sys", depID, BUNDLE_TYPE_SYS, manifest.SysBun.URI, DEPLOYMENT_STATE_INPROG, timeNow) + "sys", depID, BUNDLE_TYPE_SYS, dep.System.URI, DEPLOYMENT_STATE_INPROG, timeNow) if err != nil { log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, "sys", err) return err } // todo: extra data? - for i, bun := range manifest.DepBun { - id := string(i) + for _, bun := range dep.Bundles { _, err = tx.Exec("INSERT INTO gateway_deploy_bundle " + "(id, deployment_id, scope, type, uri, status, created_at) " + "VALUES(?,?,?,?,?,?,?);", - id, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow) + bun.BundleId, depID, bun.Scope, BUNDLE_TYPE_DEP, bun.URI, DEPLOYMENT_STATE_INPROG, timeNow) if err != nil { - log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, id, err) + log.Errorf("INSERT gateway_deploy_bundle %s:%s failed: %v", depID, bun.BundleId, err) return err } } @@ -303,7 +307,6 @@ } else { fileUrl := getBundleFilePath(depID, uri) bd := bundle{ - AuthCode: bundleID, // todo: authCode? BundleId: bundleID, URI: fileUrl, }
diff --git a/deployments.go b/deployments.go index 3115217..f4a9f60 100644 --- a/deployments.go +++ b/deployments.go
@@ -10,6 +10,7 @@ "os" "encoding/base64" "path" + "errors" ) var ( @@ -33,16 +34,20 @@ DepBun []dependantBundle `json:"bundles"` } +// event bundle type bundle struct { - BundleId string `json:"bundleId"` - URI string `json:"uri"` - AuthCode string `json:"authCode,omitempty"` + BundleId string `json:"bundleId"` + URI string `json:"uri"` + Scope string `json:"scope"` + Org string `json:"org"` + Env string `json:"env"` } +// event deployment type deployment struct { DeploymentId string `json:"deploymentId"` - Bundles []bundle `json:"bundles"` System bundle `json:"system"` + Bundles []bundle `json:"bundles"` } type deploymentErrorDetail struct { @@ -96,7 +101,7 @@ // todo: retry on error? // check if already exists and skip -func prepareBundle(depID string, bun dependantBundle) error { +func prepareBundle(depID string, bun bundle) error { bundleFile := getBundleFilePath(depID, bun.URI) out, err := os.Create(bundleFile) @@ -132,7 +137,7 @@ // returns first bundle download error // all bundles will be attempted regardless of errors, in the future we could retry -func prepareDeployment(depID string, manifest bundleManifest) error { +func prepareDeployment(depID string, dep deployment) error { deploymentPath := getDeploymentFilesPath(depID) err := os.Mkdir(deploymentPath, 0700) @@ -143,15 +148,15 @@ // todo: any reason to put all this in a single transaction? - err = insertDeployment(depID, manifest) + err = insertDeployment(depID, dep) if err != nil { log.Errorf("Prepare deployment failed: %v", err) return err } // download bundles and store them locally - errors := make(chan error, len(manifest.DepBun)) - for i, bun := range manifest.DepBun { + errors := make(chan error, len(dep.Bundles)) + for i, bun := range dep.Bundles { go func() { err := prepareBundle(depID, bun) errors <- err @@ -166,7 +171,7 @@ } // fail fast on first error, otherwise wait for completion - for range manifest.DepBun { + for range dep.Bundles { err := <- errors if err != nil { updateDeploymentStatus(db, depID, DEPLOYMENT_STATE_ERR_APID, ERROR_CODE_TODO) @@ -186,10 +191,8 @@ return } - var manifest bundleManifest - err := json.Unmarshal([]byte(manifestString), &manifest) + manifest, err := parseManifest(manifestString) if err != nil { - log.Errorf("JSON decoding Manifest failed Err: %v", err) return } @@ -207,3 +210,33 @@ log.Debugf("Signaling new deployment ready: %s", depID) incoming <- depID } + +func parseManifest(manifestString string) (dep deployment, err error) { + err = json.Unmarshal([]byte(manifestString), &dep) + if err != nil { + log.Errorf("JSON decoding Manifest failed Err: %v", err) + return + } + + // todo: validate manifest... + if dep.System.URI == "" { + err = errors.New("system bundle uri is required") + return + } + for _, bun := range dep.Bundles { + if bun.BundleId == "" { + err = errors.New("bundle bundleID is required") + return + } + if bun.URI == "" { + err = errors.New("bundle uri is required") + return + } + if bun.Scope == "" { + err = errors.New("bundle scope is required") + return + } + } + + return +}
diff --git a/init.go b/init.go index 11d79a7..4b07bad 100644 --- a/init.go +++ b/init.go
@@ -55,6 +55,7 @@ initAPI(services) initListener(services) + // todo: in goroutine? serviceDeploymentQueue() log.Debug("end init")
diff --git a/listener.go b/listener.go index c06709c..7ae60af 100644 --- a/listener.go +++ b/listener.go
@@ -2,11 +2,16 @@ import ( "github.com/30x/apid" - "github.com/30x/apidApigeeSync" + "github.com/30x/transicator/common" +) + +const ( + APIGEE_SYNC_EVENT = "ApigeeSync" + MANIFEST_TABLE = "edgex.apid_config_manifest" ) func initListener(services apid.Services) { - services.Events().Listen(apidApigeeSync.ApigeeSyncEventSelector, &apigeeSyncHandler{}) + services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{}) } type apigeeSyncHandler struct { @@ -17,29 +22,72 @@ } func (h *apigeeSyncHandler) Handle(e apid.Event) { - changeSet, ok := e.(*apidApigeeSync.ChangeSet) - if !ok { - log.Errorf("Received non-ChangeSet event.") - return + + if changeSet, ok := e.(*common.ChangeList); ok { + processChangeList(changeSet) + } else if snapData, ok := e.(*common.Snapshot); ok { + processSnapshot(snapData) + } else { + log.Errorf("Received invalid event: %v", e) + } +} + +func processSnapshot(snapshot *common.Snapshot) { + + for _, table := range snapshot.Tables { + var err error + switch table.Name { + case MANIFEST_TABLE: + log.Debugf("Snapshot of %s with %d rows", table.Name, table.Rows) + // todo: should be 0 or 1 per system!! + row := table.Rows[len(table.Rows)-1] + err = processNewManifest(row) + } + if err != nil { + log.Panicf("Error processing Snapshot: %v", err) + } } - log.Debugf("apigeeSyncEvent: %d changes", len(changeSet.Changes)) + log.Debug("Snapshot processed") +} - for _, payload := range changeSet.Changes { +func processChangeList(changes *common.ChangeList) { + log.Debugf("Process %d changes", len(changes.Changes)) - if payload.Data.EntityType != "deployment" { - continue - } + for _, change := range changes.Changes { + log.Debugf("payload table: %s operation: %s", change.Table, change.Operation) - switch payload.Data.Operation { - case "create": - err := queueDeployment(payload.Data) - if err == nil { - serviceDeploymentQueue() - } else { - log.Errorf("unable to queue deployment") + var err error + switch change.Table { + case MANIFEST_TABLE: + switch change.Operation { + case common.Insert: + err = processNewManifest(change.NewRow) } } - + if err != nil { + log.Panicf("Error processing ChangeList: %v", err) + } } } + +func processNewManifest(row common.Row) error { + + var deploymentID, manifest string + err := row.Get("id", &deploymentID) + if err != nil { + return err + } + err = row.Get("body", &manifest) + if err != nil { + return err + } + + err = queueDeployment(deploymentID, manifest) + + if err == nil { + go serviceDeploymentQueue() + } + + return err +}
diff --git a/listener_test.go b/listener_test.go index 2e2ae47..d89e43b 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -3,16 +3,15 @@ import ( "encoding/json" "github.com/30x/apid" - . "github.com/30x/apidApigeeSync" // for direct access to Payload types . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "net/url" - "time" + "github.com/30x/transicator/common" ) var _ = Describe("listener", func() { - It("should store data from ApigeeSync in the database", func(done Done) { + It("should process ApigeeSync snapshot event", func(done Done) { deploymentID := "listener_test_1" @@ -21,6 +20,78 @@ uri.Path = "/bundle" bundleUri := uri.String() + dep := deployment{ + DeploymentId: deploymentID, + System: bundle{ + URI: bundleUri, + }, + Bundles: []bundle{ + { + BundleId: "bun", + URI: bundleUri, + Scope: "some-scope", + }, + }, + } + + depBytes, err := json.Marshal(dep) + Expect(err).ShouldNot(HaveOccurred()) + + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["body"] = &common.ColumnVal{Value: string(depBytes)} + + var event = common.Snapshot{} + event.Tables = []common.Table{ + { + Name: MANIFEST_TABLE, + Rows: []common.Row{row}, + }, + } + + h := &test_handler{ + "checkDatabase", + func(e apid.Event) { + defer GinkgoRecover() + + // ignore the first event, let standard listener process it + changeSet := e.(*common.Snapshot) + if len(changeSet.Tables) > 0 { + return + } + + // force queue to be emptied + serviceDeploymentQueue() + + depID, err := getCurrentDeploymentID() + Expect(err).ShouldNot(HaveOccurred()) + Expect(depID).Should(Equal(deploymentID)) + + dep, err := getDeployment(depID) + Expect(err).ShouldNot(HaveOccurred()) + + Expect(dep.System.URI).To(Equal(dep.System.URI)) + Expect(len(dep.Bundles)).To(Equal(len(dep.Bundles))) + Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri))) + + close(done) + }, + } + + apid.Events().Listen(APIGEE_SYNC_EVENT, h) + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // for standard listener + apid.Events().Emit(APIGEE_SYNC_EVENT, &common.Snapshot{}) // for test listener + }) + + It("should process ApigeeSync change event", func(done Done) { + + deploymentID := "listener_test_2" + + uri, err := url.Parse(testServer.URL) + Expect(err).ShouldNot(HaveOccurred()) + uri.Path = "/bundle" + bundleUri := uri.String() + man := bundleManifest{ SysBun: systemBundle{ URI: bundleUri, @@ -28,6 +99,7 @@ DepBun: []dependantBundle{ { URI: bundleUri, + Scope: "some-scope", }, }, } @@ -35,32 +107,33 @@ Expect(err).ShouldNot(HaveOccurred()) manifest := string(manBytes) - now := time.Now().Unix() - var event = ChangeSet{} - event.Changes = []ChangePayload{ + row := common.Row{} + row["id"] = &common.ColumnVal{Value: deploymentID} + row["body"] = &common.ColumnVal{Value: manifest} + + var event = common.ChangeList{} + event.Changes = []common.Change{ { - Data: DataPayload{ - EntityType: "deployment", - Operation: "create", - EntityIdentifier: deploymentID, - PldCont: Payload{ - CreatedAt: now, - Manifest: manifest, - }, - }, + Operation: common.Insert, + Table: MANIFEST_TABLE, + NewRow: row, }, } h := &test_handler{ "checkDatabase", func(e apid.Event) { + defer GinkgoRecover() // ignore the first event, let standard listener process it - changeSet := e.(*ChangeSet) + changeSet := e.(*common.ChangeList) if len(changeSet.Changes) > 0 { return } + // force queue to be emptied + serviceDeploymentQueue() + depID, err := getCurrentDeploymentID() Expect(err).ShouldNot(HaveOccurred()) Expect(depID).Should(Equal(deploymentID)) @@ -72,29 +145,14 @@ Expect(len(dep.Bundles)).To(Equal(len(man.DepBun))) Expect(dep.Bundles[0].URI).To(Equal(getBundleFilePath(deploymentID, bundleUri))) - // todo: should do a lot more checking here... maybe call another api instead? - //var selectedManifest string - //var createdAt int64 - //err = db.QueryRow("SELECT manifest, created_at from bundle_deployment where id = ?", deploymentID). - // Scan(&selectedManifest, &createdAt) - //Expect(err).ShouldNot(HaveOccurred()) - // - //Expect(manifest).Should(Equal(selectedManifest)) - //Expect(createdAt).Should(Equal(now)) - - // clean up - //_, err = db.Exec("DELETE from bundle_deployment where id = ?", deploymentID) - //Expect(err).ShouldNot(HaveOccurred()) - close(done) }, } - apid.Events().Listen(ApigeeSyncEventSelector, h) - apid.Events().Emit(ApigeeSyncEventSelector, &event) // for standard listener - apid.Events().Emit(ApigeeSyncEventSelector, &ChangeSet{}) // for test listener + apid.Events().Listen(APIGEE_SYNC_EVENT, h) + apid.Events().Emit(APIGEE_SYNC_EVENT, &event) // for standard listener + apid.Events().Emit(APIGEE_SYNC_EVENT, &common.ChangeList{}) // for test listener }) - }) type test_handler struct {