blob: 3b5754249c3fb554d82df2cd35f6b26294c13a0b [file] [log] [blame]
package apiGatewayDeploy
import (
"database/sql"
"encoding/json"
"github.com/30x/apid"
"github.com/apigee-labs/transicator/common"
)
const (
APIGEE_SYNC_EVENT = "ApigeeSync"
DEPLOYMENT_TABLE = "edgex.deployment"
)
func initListener(services apid.Services) {
services.Events().Listen(APIGEE_SYNC_EVENT, &apigeeSyncHandler{})
}
type bundleConfigJson struct {
Name string `json:"name"`
URI string `json:"uri"`
ChecksumType string `json:"checksumType"`
Checksum string `json:"checksum"`
}
type apigeeSyncHandler struct {
}
func (h *apigeeSyncHandler) String() string {
return "gatewayDeploy"
}
func (h *apigeeSyncHandler) Handle(e apid.Event) {
if changeSet, ok := e.(*common.ChangeList); ok {
processChangeList(changeSet)
} else if snapData, ok := e.(*common.Snapshot); ok {
processSnapshot(snapData)
} else {
log.Errorf("Received invalid event. Ignoring. %v", e)
}
}
func processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
db, err := data.DBVersion(snapshot.SnapshotInfo)
if err != nil {
log.Panicf("Unable to access database: %v", err)
}
err = InitDB(db)
if err != nil {
log.Panicf("Unable to initialize database: %v", err)
}
tx, err := db.Begin()
if err != nil {
log.Panicf("Error starting transaction: %v", err)
}
defer tx.Rollback()
for _, table := range snapshot.Tables {
var err error
switch table.Name {
case DEPLOYMENT_TABLE:
log.Debugf("Snapshot of %s with %d rows", table.Name, len(table.Rows))
if len(table.Rows) == 0 {
return
}
for _, row := range table.Rows {
addDeployment(tx, row)
}
}
if err != nil {
log.Panicf("Error processing Snapshot: %v", err)
}
}
err = tx.Commit()
if err != nil {
log.Panicf("Error committing Snapshot change: %v", err)
}
SetDB(db)
log.Debug("Snapshot processed")
}
func processChangeList(changes *common.ChangeList) {
tx, err := getDB().Begin()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
defer tx.Rollback()
for _, change := range changes.Changes {
var err error
switch change.Table {
case DEPLOYMENT_TABLE:
switch change.Operation {
case common.Insert:
err = addDeployment(tx, change.NewRow)
case common.Delete:
var id string
err = change.OldRow.Get("id", &id)
if err == nil {
err = deleteDeployment(tx, id)
// todo: delete downloaded bundle file
}
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
}
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
}
err = tx.Commit()
if err != nil {
log.Panicf("Error processing ChangeList: %v", err)
}
}
func addDeployment(tx *sql.Tx, row common.Row) (err error) {
d := DataDeployment{}
err = row.Get("id", &d.ID)
if err != nil {
return
}
err = row.Get("bundle_config_id", &d.BundleConfigID)
if err != nil {
return
}
err = row.Get("apid_cluster_id", &d.ApidClusterID)
if err != nil {
return
}
err = row.Get("data_scope_id", &d.DataScopeID)
if err != nil {
return
}
err = row.Get("bundle_config_json", &d.BundleConfigJSON)
if err != nil {
return
}
err = row.Get("config_json", &d.ConfigJSON)
if err != nil {
return
}
err = row.Get("status", &d.Status)
if err != nil {
return
}
err = row.Get("created", &d.Created)
if err != nil {
return
}
err = row.Get("created_by", &d.CreatedBy)
if err != nil {
return
}
err = row.Get("updated", &d.Updated)
if err != nil {
return
}
err = row.Get("updated_by", &d.UpdatedBy)
if err != nil {
return
}
var bc bundleConfigJson
err = json.Unmarshal([]byte(d.BundleConfigJSON), &bc)
if err != nil {
log.Errorf("JSON decoding Manifest failed: %v", err)
return
}
d.BundleName = bc.Name
d.BundleURI = bc.URI
d.BundleChecksumType = bc.ChecksumType
d.BundleChecksum = bc.Checksum
err = InsertDeployment(tx, d)
if err != nil {
return
}
// todo: limit # concurrent downloads?
go downloadBundle(d)
return
}