blob: b203187ba372a3cfcca2a2a37d28c27e2fc974ad [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
import (
"os"
"github.com/apid/apid-core"
"github.com/apigee-labs/transicator/common"
)
const (
APIGEE_SYNC_EVENT = "ApigeeSync"
CONFIG_METADATA_TABLE = "metadata.runtime_entity_metadata"
)
func (h *apigeeSyncHandler) initListener(services apid.Services) {
services.Events().Listen(APIGEE_SYNC_EVENT, h)
}
func (h *apigeeSyncHandler) stopListener(services apid.Services) {
if !h.closed {
services.Events().StopListening(APIGEE_SYNC_EVENT, h)
h.closed = true
}
}
type apigeeSyncHandler struct {
dbMan dbManagerInterface
apiMan apiManagerInterface
bundleMan bundleManagerInterface
closed bool
}
func (h *apigeeSyncHandler) String() string {
return "gatewayDeploy"
}
func (h *apigeeSyncHandler) Handle(e apid.Event) {
if changeSet, ok := e.(*common.ChangeList); ok {
h.processChangeList(changeSet)
} else if snapData, ok := e.(*common.Snapshot); ok {
h.processSnapshot(snapData)
} else {
log.Debugf("Received invalid event. Ignoring. %v", e)
}
}
func (h *apigeeSyncHandler) processSnapshot(snapshot *common.Snapshot) {
log.Debugf("Snapshot received. Switching to DB version: %s", snapshot.SnapshotInfo)
h.dbMan.setDbVersion(snapshot.SnapshotInfo)
err := h.dbMan.initDb()
if err != nil {
log.Panicf("unable to init DB: %v", err)
}
if lsn := h.dbMan.getLSN(); lsn != "" {
// receive a new snapshot at runtime
if err = h.dbMan.updateLSN(lsn); err != nil {
log.Errorf("Unable to update LSN: %v", err)
}
} else { //apid just started
if err = h.dbMan.loadLsnFromDb(); err != nil {
log.Errorf("Unable to load LSN From Db: %v", err)
}
}
h.startupOnExistingDatabase()
//h.apiMan.InitAPI()
log.Debug("Snapshot processed")
}
func (h *apigeeSyncHandler) startupOnExistingDatabase() {
// start bundle downloads that didn't finish
go func() {
// create apid_blob_available table
blobIds, err := h.dbMan.getUnreadyBlobs()
if err != nil {
log.Panicf("unable to query database for unready configurations: %v", err)
}
log.Debugf("Queuing %d blob downloads", len(blobIds))
// initialize API endpoints only after 1 round of download attempts is made
h.bundleMan.downloadBlobsWithCallback(blobIds, func() {
h.apiMan.InitAPI()
h.apiMan.notifyNewChange()
})
}()
}
func (h *apigeeSyncHandler) processChangeList(changes *common.ChangeList) {
log.Debugf("Processing changes")
// changes have been applied to DB by apidApigeeSync
var insertedConfigs, updatedNewConfigs, updatedOldConfigs, deletedConfigs []*Configuration
isConfigChanged := false
for _, change := range changes.Changes {
switch change.Table {
case CONFIG_METADATA_TABLE:
isConfigChanged = true
switch change.Operation {
case common.Insert:
conf := configurationFromRow(change.NewRow)
insertedConfigs = append(insertedConfigs, &conf)
case common.Delete:
conf := configurationFromRow(change.OldRow)
deletedConfigs = append(deletedConfigs, &conf)
case common.Update:
confNew := configurationFromRow(change.NewRow)
confOld := configurationFromRow(change.OldRow)
updatedNewConfigs = append(updatedNewConfigs, &confNew)
updatedOldConfigs = append(updatedOldConfigs, &confOld)
default:
log.Errorf("unexpected operation: %s", change.Operation)
}
}
}
// delete old configs from FS
if len(deletedConfigs)+len(updatedOldConfigs) > 0 {
log.Debugf("will delete %d old blobs", len(deletedConfigs)+len(updatedOldConfigs))
//TODO delete blobs for deleted configs
blobIds := extractBlobsToDelete(append(deletedConfigs, updatedOldConfigs...))
go h.bundleMan.deleteBlobs(blobIds)
}
// download and expose new configs
if isConfigChanged {
h.dbMan.updateLSN(changes.LastSequence)
blobs := extractBlobsToDownload(append(insertedConfigs, updatedNewConfigs...))
h.bundleMan.downloadBlobsWithCallback(blobs, h.apiMan.notifyNewChange)
} else if h.dbMan.getLSN() == InitLSN {
h.dbMan.updateLSN(changes.LastSequence)
}
}
func extractBlobsToDownload(confs []*Configuration) (blobs []string) {
//TODO: do not include already-downloaded blobs
for _, conf := range confs {
if conf.BlobID != "" {
blobs = append(blobs, conf.BlobID)
}
if conf.BlobResourceID != "" {
blobs = append(blobs, conf.BlobResourceID)
}
}
return
}
func extractBlobsToDelete(confs []*Configuration) (blobs []string) {
//TODO: do not include already-downloaded blobs
for _, conf := range confs {
if conf.BlobID != "" {
blobs = append(blobs, conf.BlobID)
}
if conf.BlobResourceID != "" {
blobs = append(blobs, conf.BlobResourceID)
}
}
return
}
func configurationFromRow(row common.Row) (c Configuration) {
row.Get("id", &c.ID)
row.Get("organization_id", &c.OrgID)
row.Get("environment_id", &c.EnvID)
row.Get("bean_blob_id", &c.BlobID)
row.Get("resource_blob_id", &c.BlobResourceID)
row.Get("type", &c.Type)
row.Get("name", &c.Name)
row.Get("revision", &c.Revision)
row.Get("path", &c.Path)
row.Get("created_at", &c.Created)
row.Get("created_by", &c.CreatedBy)
row.Get("updated_at", &c.Updated)
row.Get("updated_by", &c.UpdatedBy)
return
}
// TODO delete from file system
func safeDelete(file string) {
if e := os.Remove(file); e != nil && !os.IsNotExist(e) {
log.Warnf("unable to delete file %s: %v", file, e)
}
}