blob: 023314dbff354bae92cb3698dfd973db8178e49d [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apidApigeeSync
import (
"github.com/apid/apid-core/data"
"github.com/apigee-labs/transicator/common"
"net/http"
"os"
"fmt"
"io"
"io/ioutil"
"net/url"
"path"
"sync/atomic"
"time"
)
const bootstrapSnapshotName = "bootstrap"
const lengthSqliteFileName = 7 // len("/sqlite")
const (
headerSnapshotNumber = "Transicator-Snapshot-TXID"
)
type apidSnapshotManager struct {
*offlineSnapshotManager
// to send quit signal to the downloading thread
quitChan chan bool
// to mark the graceful close of snapshotManager
finishChan chan bool
// 0 for not closed, 1 for closed
isClosed *int32
// make sure close() returns immediately if there's no downloading/processing snapshot
isDownloading *int32
tokenMan tokenManager
dbMan DbManager
client *http.Client
}
func createSnapShotManager(dbMan DbManager, tokenMan tokenManager, client *http.Client) *apidSnapshotManager {
isClosedInt := int32(0)
isDownloadingInt := int32(0)
return &apidSnapshotManager{
offlineSnapshotManager: &offlineSnapshotManager{
dbMan: dbMan,
},
quitChan: make(chan bool, 1),
finishChan: make(chan bool, 1),
isClosed: &isClosedInt,
isDownloading: &isDownloadingInt,
dbMan: dbMan,
tokenMan: tokenMan,
client: client,
}
}
/*
* thread-safe close of snapshotManager
* It marks status as closed immediately, and quits backoff downloading
* use <- close() for blocking close
* should only be called by pollChangeManager, because pollChangeManager is dependent on it
*/
func (s *apidSnapshotManager) close() <-chan bool {
//has been closed before
if atomic.SwapInt32(s.isClosed, 1) == int32(1) {
log.Warn("snapshotManager: close() called on a closed snapshotManager!")
go func() {
s.finishChan <- false
}()
return s.finishChan
}
s.quitChan <- true
// wait until no downloading
for atomic.LoadInt32(s.isDownloading) == int32(1) {
time.Sleep(time.Millisecond)
}
s.finishChan <- true
return s.finishChan
}
// retrieve boot information: apid_config and apid_config_scope
func (s *apidSnapshotManager) downloadBootSnapshot() {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadBootSnapshot: only 1 thread can download snapshot at the same time!")
}
defer atomic.StoreInt32(s.isDownloading, int32(0))
log.Debug("download Snapshot for boot data")
scopes := []string{apidInfo.ClusterID}
snapshot := &common.Snapshot{}
s.downloadSnapshot(true, scopes, snapshot)
// note that for boot snapshot case, we don't need to inform plugins as they'll get the data snapshot
s.storeBootSnapshot(snapshot)
}
func (s *apidSnapshotManager) storeBootSnapshot(snapshot *common.Snapshot) {
if err := s.dbMan.processSnapshot(snapshot, false); err != nil {
log.Panic(err)
}
}
// use the scope IDs from the boot snapshot to get all the data associated with the scopes
func (s *apidSnapshotManager) downloadDataSnapshot() error {
if atomic.SwapInt32(s.isDownloading, 1) == int32(1) {
log.Panic("downloadDataSnapshot: only 1 thread can download snapshot at the same time!")
}
defer atomic.StoreInt32(s.isDownloading, int32(0))
log.Debug("download Snapshot for data scopes")
scopes, err := s.dbMan.findScopesForId(apidInfo.ClusterID)
if err != nil {
return err
}
scopes = append(scopes, apidInfo.ClusterID)
snapshot := &common.Snapshot{}
s.downloadSnapshot(false, scopes, snapshot)
return s.startOnDataSnapshot(snapshot.SnapshotInfo)
}
// a blocking method
// will keep retrying with backoff until success
func (s *apidSnapshotManager) downloadSnapshot(isBoot bool, scopes []string, snapshot *common.Snapshot) {
log.Debug("downloadSnapshot")
snapshotUri, err := url.Parse(config.GetString(configSnapServerBaseURI))
if err != nil {
log.Panicf("bad url value for config %s: %s", snapshotUri, err)
}
snapshotUri.Path = path.Join(snapshotUri.Path, "snapshots")
v := url.Values{}
for _, scope := range scopes {
v.Add("scope", scope)
}
snapshotUri.RawQuery = v.Encode()
uri := snapshotUri.String()
log.Infof("Snapshot Download: %s", uri)
//pollWithBackoff only accepts function that accept a single quit channel
//to accommodate functions which need more parameters, wrap them in closures
attemptDownload := s.getAttemptDownloadClosure(isBoot, snapshot, uri)
pollWithBackoff(s.quitChan, attemptDownload, handleSnapshotServerError)
}
func (s *apidSnapshotManager) getAttemptDownloadClosure(isBoot bool, snapshot *common.Snapshot, uri string) func(chan bool) error {
return func(_ chan bool) error {
var tid string
req, err := http.NewRequest("GET", uri, nil)
if err != nil {
// should never happen, but if it does, it's unrecoverable anyway
log.Panicf("Snapshotserver comm error: %v", err)
}
addHeaders(req, s.tokenMan.getBearerToken())
if config.GetString(configSnapshotProtocol) != "sqlite" {
log.Panic("Only currently supported snashot protocol is sqlite")
}
req.Header.Set("Accept", "application/transicator+sqlite")
// Issue the request to the snapshot server
r, err := s.client.Do(req)
if err != nil {
log.Errorf("Snapshotserver comm error: %v", err)
return err
}
defer r.Body.Close()
switch r.StatusCode {
case http.StatusOK:
break
case http.StatusUnauthorized:
s.tokenMan.invalidateToken()
fallthrough
default:
body, _ := ioutil.ReadAll(r.Body)
log.Errorf("Snapshot server conn failed with resp code %d, body: %s", r.StatusCode, string(body))
return expected200Error
}
// Bootstrap scope is a special case, that can occur only once. The tid is
// hardcoded to "bootstrap" to ensure there can be no clash of tid between
// bootstrap and subsequent data scopes.
if isBoot {
tid = bootstrapSnapshotName
} else {
tid = r.Header.Get(headerSnapshotNumber)
}
// Decode the Snapshot server response
err = processSnapshotServerFileResponse(tid, r.Body, snapshot)
if err != nil {
log.Errorf("Snapshot server response Data not parsable: %v", err)
return err
}
return nil
}
}
func processSnapshotServerFileResponse(dbId string, body io.Reader, snapshot *common.Snapshot) error {
dbPath := data.DBPath("common/" + dbId)
dbDir := dbPath[0 : len(dbPath)-lengthSqliteFileName]
log.Infof("Attempting to stream the sqlite snapshot to %s", dbPath)
// if other bootstrap snapshot exists, delete the old file
if dbId == bootstrapSnapshotName {
if _, err := os.Stat(dbDir); !os.IsNotExist(err) {
if err = os.RemoveAll(dbDir); err != nil {
log.Errorf("Failed to delete old bootstrap snapshot; %v", err)
return err
}
}
}
//this path includes the sqlite3 file name. why does mkdir all stop at parent??
log.Infof("Creating directory with mkdirall %s", dbPath)
err := os.MkdirAll(dbDir, 0700)
if err != nil {
log.Errorf("Error creating db path %s", err)
}
out, err := os.Create(dbPath)
if err != nil {
return err
}
defer out.Close()
//stream respose to DB
_, err = io.Copy(out, body)
if err != nil {
return err
}
snapshot.SnapshotInfo = dbId
//TODO get timestamp from transicator. Not currently in response
return nil
}
func handleSnapshotServerError(err error) {
log.Errorf("Error connecting to snapshot server: %v", err)
}
type offlineSnapshotManager struct {
dbMan DbManager
}
func (o *offlineSnapshotManager) close() <-chan bool {
c := make(chan bool, 1)
c <- true
return c
}
func (o *offlineSnapshotManager) downloadBootSnapshot() {
log.Panic("downloadBootSnapshot called for offlineSnapshotManager")
}
func (o *offlineSnapshotManager) downloadDataSnapshot() error {
return fmt.Errorf("downloadDataSnapshot called for offlineSnapshotManager")
}
func (o *offlineSnapshotManager) startOnDataSnapshot(snapshotName string) error {
log.Infof("Processing snapshot: %s", snapshotName)
snapshot := &common.Snapshot{
SnapshotInfo: snapshotName,
}
if err := o.dbMan.processSnapshot(snapshot, true); err != nil {
return err
}
log.Info("Emitting Snapshot to plugins")
select {
case <-time.After(pluginTimeout):
return fmt.Errorf("timeout, plugins failed to respond to snapshot")
case <-eventService.Emit(ApigeeSyncEventSelector, snapshot):
// the new snapshot has been processed
}
return nil
}