Add Delete/Up support. Code refactor.
diff --git a/api.go b/api.go index 9bbce01..3361c74 100644 --- a/api.go +++ b/api.go
@@ -98,7 +98,7 @@ return errorResponse(reason, errorCode) } - sSql = "SELECT ap.api_resources, ap.environments, c.issued_at, c.app_status, a.callback_url, d.username, d.id FROM APP_CREDENTIAL AS c INNER JOIN APP AS a ON c.app_id = a.id INNER JOIN DEVELOPER AS d ON a.developer_id = d.id INNER JOIN APP_CREDENTIAL_APIPRODUCT_MAPPER as mp ON mp.appcred_id = c.id INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id WHERE (UPPER(d.status) = 'ACTIVE' AND mp.apiprdt_id = ap.id AND mp.app_id = a.id AND mp.appcred_id = c.id AND UPPER(mp.status) = 'APPROVED' AND UPPER(a.status) = 'APPROVED' AND UPPER(c.status) = 'APPROVED' AND c.id = '" + key + "' AND c._apid_scope = '" + org + "');" + sSql = "SELECT ap.api_resources, ap.environments, c.issued_at, c.status, a.callback_url, d.username, d.id FROM APP_CREDENTIAL AS c INNER JOIN APP AS a ON c.app_id = a.id INNER JOIN DEVELOPER AS d ON a.developer_id = d.id INNER JOIN APP_CREDENTIAL_APIPRODUCT_MAPPER as mp ON mp.appcred_id = c.id INNER JOIN API_PRODUCT as ap ON ap.id = mp.apiprdt_id WHERE (UPPER(d.status) = 'ACTIVE' AND mp.apiprdt_id = ap.id AND mp.app_id = a.id AND mp.appcred_id = c.id AND UPPER(mp.status) = 'APPROVED' AND UPPER(a.status) = 'APPROVED' AND c.id = '" + key + "' AND c._apid_scope = '" + org + "');" err = db.QueryRow(sSql).Scan(&resName, &resEnv, &issuedAt, &status, &redirectionURIs, &developerAppName, &developerId)
diff --git a/api_test.go b/api_test.go index e40ec96..50d1567 100644 --- a/api_test.go +++ b/api_test.go
@@ -39,8 +39,10 @@ db, err = apid.Data().DB() Expect(err).NotTo(HaveOccurred()) - insertTestData(db) - + txn, err := db.Begin() + Expect(err).ShouldNot(HaveOccurred()) + insertTestData(db, txn) + txn.Commit() server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == apiPath { handleRequest(w, req) @@ -157,7 +159,7 @@ }) }) -func insertTestData(db *sql.DB) { +func insertTestData(db *sql.DB, txn *sql.Tx) { for i := 0; i < 10; i++ { var rows []common.Row @@ -194,7 +196,7 @@ } srvItems["tenant_id"] = scv rows = append(rows, srvItems) - res := insertAPIproducts(rows, db) + res := insertAPIproducts(rows, db, txn) Expect(res).Should(BeTrue()) } @@ -246,7 +248,7 @@ srvItems["tenant_id"] = scv rows = append(rows, srvItems) - res := insertDevelopers(rows, db) + res := insertDevelopers(rows, db, txn) Expect(res).Should(BeTrue()) } @@ -295,7 +297,7 @@ } srvItems["tenant_id"] = scv rows = append(rows, srvItems) - res := insertApplications(rows, db) + res := insertApplications(rows, db, txn) Expect(res).Should(BeTrue()) } k = j @@ -336,7 +338,7 @@ } srvItems["tenant_id"] = scv rows = append(rows, srvItems) - res := insertCredentials(rows, db) + res := insertCredentials(rows, db, txn) Expect(res).Should(BeTrue()) } @@ -379,7 +381,7 @@ } srvItems["tenant_id"] = scv rows = append(rows, srvItems) - res := insertAPIProductMappers(rows, db) + res := insertAPIProductMappers(rows, db, txn) Expect(res).Should(BeTrue()) }
diff --git a/listener.go b/listener.go index ec646cf..8b35d85 100644 --- a/listener.go +++ b/listener.go
@@ -15,30 +15,41 @@ func (h *handler) Handle(e apid.Event) { - snapData, ok := e.(*common.Snapshot) - if ok { - processSnapshot(snapData) - } else { - changeSet, ok := e.(*common.ChangeList) - if ok { - processChange(changeSet) - } else { - log.Errorf("Received Invalid event. This shouldn't happen!") - } - } - return -} - -func processSnapshot(snapshot *common.Snapshot) { - res := true - log.Debugf("Process Snapshot data") - db, err := data.DB() if err != nil { panic("Unable to access Sqlite DB") } + txn, err := db.Begin() + if err != nil { + log.Error("Unable to create Sqlite transaction") + return + } + + snapData, ok := e.(*common.Snapshot) + if ok { + processSnapshot(snapData, db, txn) + } else { + changeSet, ok := e.(*common.ChangeList) + if ok { + processChange(changeSet, db, txn) + } else { + log.Errorf("Received Invalid event. This shouldn't happen!") + } + } + if res == true { + txn.Commit() + } else { + txn.Rollback() + } + return +} + +func processSnapshot(snapshot *common.Snapshot, db *sql.DB, txn *sql.Tx) bool { + + res := true + log.Debugf("Process Snapshot data") /* * Iterate the tables, and insert the rows, * Commit them in bulk. @@ -46,28 +57,29 @@ for _, payload := range snapshot.Tables { switch payload.Name { case "kms.developer": - res = insertDevelopers(payload.Rows, db) + res = insertDevelopers(payload.Rows, db, txn) case "kms.app": - res = insertApplications(payload.Rows, db) + res = insertApplications(payload.Rows, db, txn) case "kms.app_credential": - res = insertCredentials(payload.Rows, db) + res = insertCredentials(payload.Rows, db, txn) case "kms.api_product": - res = insertAPIproducts(payload.Rows, db) + res = insertAPIproducts(payload.Rows, db, txn) case "kms.app_credential_apiproduct_mapper": - res = insertAPIProductMappers(payload.Rows, db) + res = insertAPIProductMappers(payload.Rows, db, txn) } if res == false { log.Error("Error encountered in Downloading Snapshot for VerifyApiKey") - return + return false } } log.Debug("Downloading Snapshot for VerifyApiKey complete") + return true } /* * Performs bulk insert of credentials */ -func insertCredentials(rows []common.Row, db *sql.DB) bool { +func insertCredentials(rows []common.Row, db *sql.DB, txn *sql.Tx) bool { var scope, id, appId, consumerSecret, appstatus, status, tenantId string var issuedAt int64 @@ -77,8 +89,7 @@ log.Error("INSERT Cred Failed: ", err) return false } - - txn, err := db.Begin() + defer prep.Close() for _, ele := range rows { ele.Get("_apid_scope", &scope) ele.Get("id", &id) @@ -100,20 +111,18 @@ if err != nil { log.Error("INSERT CRED Failed: ", id, ", ", scope, ")", err) - txn.Rollback() return false } else { log.Debug("INSERT CRED Success: (", id, ", ", scope, ")") } } - txn.Commit() return true } /* * Performs Bulk insert of Applications */ -func insertApplications(rows []common.Row, db *sql.DB) bool { +func insertApplications(rows []common.Row, db *sql.DB, txn *sql.Tx) bool { var scope, EntityIdentifier, DeveloperId, CallbackUrl, Status, AppName, AppFamily, tenantId, CreatedBy, LastModifiedBy string var CreatedAt, LastModifiedAt int64 @@ -124,7 +133,7 @@ return false } - txn, err := db.Begin() + defer prep.Close() for _, ele := range rows { ele.Get("_apid_scope", &scope) @@ -156,13 +165,11 @@ if err != nil { log.Error("INSERT APP Failed: (", EntityIdentifier, ", ", tenantId, ")", err) - txn.Rollback() return false } else { log.Debug("INSERT APP Success: (", EntityIdentifier, ", ", tenantId, ")") } } - txn.Commit() return true } @@ -170,7 +177,7 @@ /* * Performs bulk insert of Developers */ -func insertDevelopers(rows []common.Row, db *sql.DB) bool { +func insertDevelopers(rows []common.Row, db *sql.DB, txn *sql.Tx) bool { var scope, EntityIdentifier, Email, Status, UserName, FirstName, LastName, tenantId, CreatedBy, LastModifiedBy, Username string var CreatedAt, LastModifiedAt int64 @@ -181,7 +188,7 @@ return false } - txn, err := db.Begin() + defer prep.Close() for _, ele := range rows { ele.Get("_apid_scope", &scope) @@ -213,20 +220,18 @@ if err != nil { log.Error("INSERT DEVELOPER Failed: (", EntityIdentifier, ", ", scope, ")", err) - txn.Rollback() return false } else { log.Debug("INSERT DEVELOPER Success: (", EntityIdentifier, ", ", scope, ")") } } - txn.Commit() return true } /* * Performs Bulk insert of API products */ -func insertAPIproducts(rows []common.Row, db *sql.DB) bool { +func insertAPIproducts(rows []common.Row, db *sql.DB, txn *sql.Tx) bool { var scope, apiProduct, res, env, tenantId string @@ -236,7 +241,7 @@ return false } - txn, err := db.Begin() + defer prep.Close() for _, ele := range rows { ele.Get("_apid_scope", &scope) @@ -254,20 +259,18 @@ if err != nil { log.Error("INSERT API_PRODUCT Failed: (", apiProduct, ", ", tenantId, ")", err) - txn.Rollback() return false } else { log.Debug("INSERT API_PRODUCT Success: (", apiProduct, ", ", tenantId, ")") } } - txn.Commit() return true } /* * Performs a bulk insert of all APP_CREDENTIAL_APIPRODUCT_MAPPER rows */ -func insertAPIProductMappers(rows []common.Row, db *sql.DB) bool { +func insertAPIProductMappers(rows []common.Row, db *sql.DB, txn *sql.Tx) bool { var ApiProduct, AppId, EntityIdentifier, tenantId, Scope, Status string @@ -277,7 +280,7 @@ return false } - txn, err := db.Begin() + defer prep.Close() for _, ele := range rows { ele.Get("apiprdt_id", &ApiProduct) @@ -311,7 +314,6 @@ ")", err) - txn.Rollback() return false } else { log.Debug("INSERT APP_CREDENTIAL_APIPRODUCT_MAPPER Success: (", @@ -324,145 +326,162 @@ ")") } } - txn.Commit() return true } -func processChange(changes *common.ChangeList) { +func processChange(changes *common.ChangeList, db *sql.DB, txn *sql.Tx) bool { + + var rows []common.Row + res := true log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes)) - var rows []common.Row - - db, err := data.DB() - if err != nil { - panic("Unable to access Sqlite DB") - } - for _, payload := range changes.Changes { rows = nil switch payload.Table { case "kms.developer": switch payload.Operation { - case 1: + case common.Insert: rows = append(rows, payload.NewRow) - insertDevelopers(rows, db) - case 2: - updateDeveloper(payload.NewRow, payload.OldRow, db) - case 3: - deleteDeveloper(payload.OldRow, db) + res = insertDevelopers(rows, db, txn) + + case common.Update: + res = deleteObject("DEVELOPER", payload.OldRow, db, txn) + rows = append(rows, payload.NewRow) + res = insertDevelopers(rows, db, txn) + + case common.Delete: + res = deleteObject("DEVELOPER", payload.OldRow, db, txn) } case "kms.app": switch payload.Operation { - case 1: + case common.Insert: rows = append(rows, payload.NewRow) - insertApplications(rows, db) - case 2: - updateApplication(payload.NewRow, payload.OldRow, db) - case 3: - deleteApplication(payload.OldRow, db) + res = insertApplications(rows, db, txn) + + case common.Update: + res = deleteObject("APP", payload.OldRow, db, txn) + rows = append(rows, payload.NewRow) + res = insertApplications(rows, db, txn) + + case common.Delete: + res = deleteObject("APP", payload.OldRow, db, txn) } case "kms.app_credential": switch payload.Operation { - case 1: + case common.Insert: rows = append(rows, payload.NewRow) - insertCredentials(rows, db) - case 2: - updateCredential(payload.NewRow, payload.OldRow, db) - case 3: - deleteCredential(payload.OldRow, db) + res = insertCredentials(rows, db, txn) + + case common.Update: + res = deleteObject("APP_CREDENTIAL", payload.OldRow, db, txn) + rows = append(rows, payload.NewRow) + res = insertCredentials(rows, db, txn) + + case common.Delete: + res = deleteObject("APP_CREDENTIAL", payload.OldRow, db, txn) } case "kms.api_product": switch payload.Operation { - case 1: + case common.Insert: rows = append(rows, payload.NewRow) - insertAPIproducts(rows, db) - case 2: - updateAPIproduct(payload.NewRow, payload.OldRow, db) - case 3: - deleteAPIproduct(payload.OldRow, db) + res = insertAPIproducts(rows, db, txn) + + case common.Update: + res = deleteObject("API_PRODUCT", payload.OldRow, db, txn) + rows = append(rows, payload.NewRow) + res = insertAPIproducts(rows, db, txn) + + case common.Delete: + res = deleteObject("API_PRODUCT", payload.OldRow, db, txn) } case "kms.app_credential_apiproduct_mapper": switch payload.Operation { - case 1: + case common.Insert: rows = append(rows, payload.NewRow) - insertAPIProductMappers(rows, db) - case 2: - updateAPIproductMapper(payload.NewRow, payload.OldRow, db) - case 3: - deleteAPIproductMapper(payload.OldRow, db) + res = insertAPIProductMappers(rows, db, txn) + + case common.Update: + res = deleteAPIproductMapper(payload.OldRow, db, txn) + rows = append(rows, payload.NewRow) + res = insertAPIProductMappers(rows, db, txn) + + case common.Delete: + res = deleteAPIproductMapper(payload.OldRow, db, txn) } } + if res == false { + log.Error("Sql Operation error. Operation rollbacked") + return false + } } -} - -/* - * DELETE APP - */ -func deleteApplication(ele common.Row, db *sql.DB) bool { return true } /* - * DELETE CRED + * DELETE OBJECT as passed in the input */ -func deleteCredential(ele common.Row, db *sql.DB) bool { - return true -} +func deleteObject(object string, ele common.Row, db *sql.DB, txn *sql.Tx) bool { -/* - * DELETE developer - */ -func deleteDeveloper(ele common.Row, db *sql.DB) bool { - return true -} + var scope, apiProduct string + ssql := "DELETE FROM " + object + " WHERE id = $1 AND _apid_scope = $2" + prep, err := db.Prepare(ssql) + if err != nil { + log.Error("DELETE ", object, " Failed: ", err) + return false + } + defer prep.Close() + ele.Get("_apid_scope", &scope) + ele.Get("id", &apiProduct) -/* - * DELETE API product - */ -func deleteAPIproduct(ele common.Row, db *sql.DB) bool { - return true + _, err = txn.Stmt(prep).Exec(apiProduct, scope) + if err != nil { + log.Error("DELETE ", object, " Failed: (", apiProduct, ", ", scope, ")", err) + return false + } else { + log.Debug("DELETE ", object, " Success: (", apiProduct, ", ", scope, ")") + return true + } + } /* * DELETE APIPRDT MAPPER */ -func deleteAPIproductMapper(ele common.Row, db *sql.DB) bool { - return true -} +func deleteAPIproductMapper(ele common.Row, db *sql.DB, txn *sql.Tx) bool { + var ApiProduct, AppId, EntityIdentifier, apid_scope string -/* - * UPDATE APP - */ -func updateApplication(ele common.Row, ele2 common.Row, db *sql.DB) bool { - return true -} + prep, err := db.Prepare("DELETE FROM APP_CREDENTIAL_APIPRODUCT_MAPPER WHERE apiprdt_id=$1 AND app_id=$2 AND appcred_id=$3 AND _apid_scope=$4;") + if err != nil { + log.Error("DELETE APP_CREDENTIAL_APIPRODUCT_MAPPER Failed: ", err) + return false + } -/* - * UPDATE CRED - */ -func updateCredential(ele common.Row, ele2 common.Row, db *sql.DB) bool { - return true -} + defer prep.Close() -/* - * UPDATE developer - */ -func updateDeveloper(ele common.Row, ele2 common.Row, db *sql.DB) bool { - return true -} + ele.Get("apiprdt_id", &ApiProduct) + ele.Get("app_id", &AppId) + ele.Get("appcred_id", &EntityIdentifier) + ele.Get("_apid_scope", &apid_scope) -/* - * UPDATE API product - */ -func updateAPIproduct(ele common.Row, ele2 common.Row, db *sql.DB) bool { - return true -} - -/* - * UPDATE APIPRDT MAPPER - */ -func updateAPIproductMapper(ele common.Row, ele2 common.Row, db *sql.DB) bool { - return true + _, err = txn.Stmt(prep).Exec(ApiProduct, AppId, EntityIdentifier, apid_scope) + if err != nil { + log.Error("DELETE APP_CREDENTIAL_APIPRODUCT_MAPPER Failed: (", + ApiProduct, ", ", + AppId, ", ", + EntityIdentifier, ", ", + apid_scope, + ")", + err) + return false + } else { + log.Debug("DELETE APP_CREDENTIAL_APIPRODUCT_MAPPER Success: (", + ApiProduct, ", ", + AppId, ", ", + EntityIdentifier, ", ", + apid_scope, + ")") + return true + } }
diff --git a/listener_test.go b/listener_test.go index 628c262..6a51136 100644 --- a/listener_test.go +++ b/listener_test.go
@@ -9,11 +9,14 @@ . "github.com/onsi/gomega" ) +var count int = 0 var _ = Describe("listener", func() { It("should store data from ApigeeSync in the database", func(done Done) { var event = common.ChangeList{} + var event2 = common.ChangeList{} + /* API Product */ srvItems := common.Row{} scv := &common.ColumnVal{ @@ -196,31 +199,56 @@ }, } + event2.Changes = []common.Change{ + { + Table: "kms.api_product", + OldRow: srvItems, + Operation: 3, + }, + { + Table: "kms.developer", + OldRow: devItems, + Operation: 3, + }, + { + Table: "kms.app", + OldRow: appItems, + Operation: 3, + }, + { + Table: "kms.app_credential", + OldRow: credItems, + Operation: 3, + }, + { + Table: "kms.app_credential_apiproduct_mapper", + OldRow: mpItems, + Operation: 3, + }, + } h := &test_handler{ - "checkDatabase", + "checkDatabase post Insertion", func(e apid.Event) { - // ignore the first event, let standard listener process it changeSet := e.(*common.ChangeList) if len(changeSet.Changes) > 0 { return } - processChange(changeSet) rsp, err := verifyAPIKey("ch_app_credential_0", "/test", "Env_0", "test_org0", "verify") Expect(err).ShouldNot(HaveOccurred()) - var respj kmsResponseSuccess json.Unmarshal(rsp, &respj) Expect(respj.Type).Should(Equal("APIKeyContext")) Expect(respj.RspInfo.Key).Should(Equal("ch_app_credential_0")) - close(done) }, } apid.Events().Listen(ApigeeSyncEventSelector, h) - apid.Events().Emit(ApigeeSyncEventSelector, &event) // for standard listener - apid.Events().Emit(ApigeeSyncEventSelector, &common.ChangeList{}) // for test listener + apid.Events().Emit(ApigeeSyncEventSelector, &event) + apid.Events().Emit(ApigeeSyncEventSelector, &event2) + apid.Events().Emit(ApigeeSyncEventSelector, &event) + apid.Events().Emit(ApigeeSyncEventSelector, &common.ChangeList{}) }) })