blob: 2a8b492f16d8f7e5358caaeda7d24e8e91e3c2a6 [file] [log] [blame] [edit]
package apidApigeeSync
import (
"github.com/30x/apid-core"
"github.com/apigee-labs/transicator/common"
)
const (
LISTENER_TABLE_APID_CLUSTER = "edgex.apid_cluster"
LISTENER_TABLE_DATA_SCOPE = "edgex.data_scope"
)
type handler struct {
}
func (h *handler) String() string {
return "ApigeeSync"
}
func (h *handler) Handle(e apid.Event) {
if changeSet, ok := e.(*common.ChangeList); ok {
processChangeList(changeSet)
} else if snapShot, ok := e.(*common.Snapshot); ok {
processSnapshot(snapShot)
} else {
log.Debugf("Received invalid event. Ignoring. %v", e)
}
}
func processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
db, err := dataService.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Unable to access database: %v", err)
}
processSqliteSnapshot(db)
//update apid instance info
apidInfo.LastSnapshot = snapshot.SnapshotInfo
err = updateApidInstanceInfo()
if err != nil {
log.Panicf("Unable to update instance info: %v", err)
}
setDB(db)
log.Debugf("Snapshot processed: %s", snapshot.SnapshotInfo)
}
func processSqliteSnapshot(db apid.DB) {
var numApidClusters int
apidClusters, err := db.Query("SELECT COUNT(*) FROM edgex_apid_cluster")
if err != nil {
log.Panicf("Unable to read database: %s", err.Error())
}
apidClusters.Next()
err = apidClusters.Scan(&numApidClusters)
if err != nil {
log.Panicf("Unable to read database: %s", err.Error())
}
if numApidClusters != 1 {
log.Panic("Illegal state for apid_cluster. Must be a single row.")
}
_, err = db.Exec("ALTER TABLE edgex_apid_cluster ADD COLUMN last_sequence text DEFAULT ''")
if err != nil {
if err.Error() == "duplicate column name: last_sequence" {
return
} else {
log.Error("[[" + err.Error() + "]]")
log.Panicf("Unable to create last_sequence column on DB. Unrecoverable error ", err)
}
}
}
func processChangeList(changes *common.ChangeList) bool {
ok := false
tx, err := getDB().Begin()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
return ok
}
defer tx.Rollback()
log.Debugf("apigeeSyncEvent: %d changes", len(changes.Changes))
for _, change := range changes.Changes {
if change.Table == LISTENER_TABLE_APID_CLUSTER {
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
}
switch change.Operation {
case common.Insert:
ok = insert(change.Table, []common.Row{change.NewRow}, tx)
case common.Update:
if change.Table == LISTENER_TABLE_DATA_SCOPE {
log.Panicf("illegal operation: %s for %s", change.Operation, change.Table)
}
ok = update(change.Table, []common.Row{change.OldRow}, []common.Row{change.NewRow}, tx)
case common.Delete:
ok = _delete(change.Table, []common.Row{change.OldRow}, tx)
}
if !ok {
log.Error("Sql Operation error. Operation rollbacked")
return ok
}
}
err = tx.Commit()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
return false
}
return ok
}