blob: 32a1d942a551d14d5704e12a523350827cb462eb [file] [log] [blame] [edit]
package apidApigeeSync
import (
"database/sql"
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
)
type handler struct {
}
func (h *handler) String() string {
return "ApigeeSync"
}
func (h *handler) Handle(e apid.Event) {
res := true
db, err := data.DB()
if err != nil {
panic("Unable to access Sqlite DB")
}
txn, err := db.Begin()
if err != nil {
log.Error("Unable to create Sqlite transaction")
return
}
snapData, ok := e.(*common.Snapshot)
if ok {
res = processSnapshot(snapData, txn)
} else {
changeSet, ok := e.(*common.ChangeList)
if ok {
res = processChange(changeSet, txn)
} else {
log.Fatal("Received invalid event: %v", e)
}
}
if res == true {
txn.Commit()
} else {
txn.Rollback()
}
return
}
func processSnapshot(snapshot *common.Snapshot, txn *sql.Tx) bool {
log.Debugf("Process Snapshot data")
res := true
for _, payload := range snapshot.Tables {
switch payload.Name {
case "edgex.apid_config":
res = insertApidConfig(payload.Rows, txn, snapshot.SnapshotInfo)
case "edgex.apid_config_scope":
res = insertApidConfigScopes(payload.Rows, txn)
}
if res == false {
log.Error("Error encountered in Downloading Snapshot for ApidApigeeSync")
return res
}
}
return res
}
func processChange(changes *common.ChangeList, txn *sql.Tx) bool {
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
var rows []common.Row
res := true
for _, payload := range changes.Changes {
rows = nil
switch payload.Table {
case "edgex.apid_config_scope":
switch payload.Operation {
case common.Insert:
rows = append(rows, payload.NewRow)
res = insertApidConfigScopes(rows, txn)
}
}
if res == false {
log.Error("Sql Operation error. Operation rollbacked")
return res
}
}
return res
}
/*
* INSERT INTO APP_CREDENTIAL op
*/
func insertApidConfig(rows []common.Row, txn *sql.Tx, snapInfo string) bool {
var scope, id, name, orgAppName, createdBy, updatedBy, Description string
var updated, created int64
prep, err := txn.Prepare("INSERT INTO APID_CONFIG (id, _apid_scope, name, umbrella_org_app_name, created, created_by, updated, updated_by, snapshotInfo)VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9);")
if err != nil {
log.Error("INSERT APID_CONFIG Failed: ", err)
return false
}
defer prep.Close()
for _, ele := range rows {
ele.Get("id", &id)
ele.Get("_apid_scope", &scope)
ele.Get("name", &name)
ele.Get("umbrella_org_app_name", &orgAppName)
ele.Get("created", &created)
ele.Get("created_by", &createdBy)
ele.Get("updated", &updated)
ele.Get("updated_by", &updatedBy)
ele.Get("description", &Description)
s := txn.Stmt(prep)
_, err = s.Exec(
id,
scope,
name,
orgAppName,
created,
createdBy,
updated,
updatedBy,
snapInfo)
s.Close()
if err != nil {
log.Error("INSERT APID_CONFIG Failed: ", id, ", ", scope, ")", err)
return false
} else {
log.Info("INSERT APID_CONFIG Success: (", id, ", ", scope, ")")
}
}
return true
}
/*
* INSERT INTO APP_CREDENTIAL op
*/
func insertApidConfigScopes(rows []common.Row, txn *sql.Tx) bool {
var id, scopeId, apiConfigId, scope, createdBy, updatedBy string
var created, updated int64
prep, err := txn.Prepare("INSERT INTO APID_CONFIG_SCOPE (id, _apid_scope, apid_config_id, scope, created, created_by, updated, updated_by)VALUES($1,$2,$3,$4,$5,$6,$7,$8);")
if err != nil {
log.Error("INSERT APID_CONFIG_SCOPE Failed: ", err)
return false
}
defer prep.Close()
for _, ele := range rows {
ele.Get("id", &id)
ele.Get("_apid_scope", &scopeId)
ele.Get("apid_config_id", &apiConfigId)
ele.Get("scope", &scope)
ele.Get("created", &created)
ele.Get("created_by", &createdBy)
ele.Get("updated", &updated)
ele.Get("updated_by", &updatedBy)
s := txn.Stmt(prep)
_, err = s.Exec(
id,
scopeId,
apiConfigId,
scope,
created,
createdBy,
updated,
updatedBy)
s.Close()
if err != nil {
log.Error("INSERT APID_CONFIG_SCOPE Failed: ", id, ", ", scope, ")", err)
return false
} else {
log.Info("INSERT APID_CONFIG_SCOPE Success: (", id, ", ", scope, ")")
}
}
return true
}