blob: c9d7d80070ad3cdd06f366fb6b6a54b71387ef35 [file] [log] [blame] [edit]
// Copyright 2017 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apiGatewayConfDeploy
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync/atomic"
"time"
)
const (
blobStoreUri = "/blobs/{blobId}"
)
type bundleManagerInterface interface {
initializeBundleDownloading()
downloadBlobsWithCallback(blobs []string, callback func())
deleteBlobs(blobIds []string)
Close()
}
type bundleManager struct {
blobServerUrl string
dbMan dbManagerInterface
apiMan apiManagerInterface
concurrentDownloads int
markConfigFailedAfter time.Duration
bundleRetryDelay time.Duration
bundleCleanupDelay time.Duration
downloadQueue chan *DownloadRequest
isClosed *int32
workers []*BundleDownloader
client *http.Client
}
type blobServerResponse struct {
Id string `json:"id"`
Kind string `json:"kind"`
Self string `json:"self"`
SignedUrl string `json:"signedurl"`
SignedUrlExpiryTimestamp string `json:"signedurlexpirytimestamp"`
}
func (bm *bundleManager) initializeBundleDownloading() {
atomic.StoreInt32(bm.isClosed, 0)
bm.workers = make([]*BundleDownloader, bm.concurrentDownloads)
// create workers
for i := 0; i < bm.concurrentDownloads; i++ {
worker := BundleDownloader{
id: i + 1,
workChan: make(chan *DownloadRequest),
bm: bm,
}
bm.workers[i] = &worker
worker.Start()
}
}
func (bm *bundleManager) makeDownloadRequest(blobId string, b *BunchDownloadRequest) *DownloadRequest {
if blobId == "" {
return nil
}
markFailedAt := time.Now().Add(bm.markConfigFailedAfter)
retryIn := bm.bundleRetryDelay
maxBackOff := 5 * time.Minute
return &DownloadRequest{
blobServerURL: bm.blobServerUrl,
bm: bm,
blobId: blobId,
backoffFunc: createBackoff(retryIn, maxBackOff),
markFailedAt: markFailedAt,
client: bm.client,
bunchRequest: b,
}
}
// a blocking method to enqueue download requests
func (bm *bundleManager) enqueueRequest(r *DownloadRequest) {
if atomic.LoadInt32(bm.isClosed) == 1 {
return
}
if r != nil {
bm.downloadQueue <- r
}
}
func (bm *bundleManager) downloadBlobsWithCallback(blobs []string, callback func()) {
c := &BunchDownloadRequest{
bm: bm,
blobs: blobs,
attemptCounter: new(int32),
callback: callback,
}
c.download()
}
func (bm *bundleManager) Close() {
atomic.StoreInt32(bm.isClosed, 1)
close(bm.downloadQueue)
}
func (bm *bundleManager) deleteBlobs(blobs []string) {
for _, id := range blobs {
go bm.deleteBlobById(id)
}
}
// TODO add delete support
func (bm *bundleManager) deleteBlobById(blobId string) {
}
type BunchDownloadRequest struct {
bm *bundleManager
blobs []string
attemptCounter *int32
callback func()
}
func (b *BunchDownloadRequest) download() {
//remove empty Ids
var ids []string
for _, id := range b.blobs {
if id != "" {
ids = append(ids, id)
}
}
b.blobs = ids
log.Debugf("Attempt to download blobs, len: %v", len(b.blobs))
if len(b.blobs) == 0 && b.callback != nil {
go b.callback()
return
}
*b.attemptCounter = int32(len(b.blobs))
for _, id := range b.blobs {
req := b.bm.makeDownloadRequest(id, b)
go b.bm.enqueueRequest(req)
}
}
func (b *BunchDownloadRequest) downloadAttempted() {
if atomic.AddInt32(b.attemptCounter, -1) == 0 && b.callback != nil {
go b.callback()
}
}
type DownloadRequest struct {
bm *bundleManager
blobId string
backoffFunc func()
markFailedAt time.Time
blobServerURL string
client *http.Client
bunchRequest *BunchDownloadRequest
attempted bool
}
func (r *DownloadRequest) downloadBlob() error {
log.Debugf("starting bundle download attempt for blobId=%s", r.blobId)
var err error
defer r.markAttempted(&err)
if r.checkTimeout() {
return &timeoutError{
markFailedAt: r.markFailedAt,
}
}
cleanTempFile := func(file string) {
if os.Remove(file) != nil {
log.Warnf("Unable to remove temp file %s", file)
}
}
downloadedFile, err := downloadFromURI(r.client, r.blobServerURL, r.blobId)
if err != nil {
log.Errorf("Unable to download blob file blobId=%s err:%v", r.blobId, err)
if downloadedFile != "" {
go cleanTempFile(downloadedFile)
}
return err
}
err = r.bm.dbMan.updateLocalFsLocation(r.blobId, downloadedFile)
if err != nil {
log.Errorf("updateLocalFsLocation failed: blobId=%s", r.blobId)
if downloadedFile != "" {
go cleanTempFile(downloadedFile)
}
return err
}
log.Debugf("blod downloaded and inserted: blobId=%s filename=%s", r.blobId, downloadedFile)
return nil
}
func (r *DownloadRequest) checkTimeout() bool {
if !r.markFailedAt.IsZero() && time.Now().After(r.markFailedAt) {
r.markFailedAt = time.Time{}
log.Debugf("bundle download timeout. blobId=", r.blobId)
// TODO notify gateway of this failure
return true
}
return false
}
func (r *DownloadRequest) markAttempted(errp *error) {
if !r.attempted {
r.attempted = true
err := *errp
if r.bunchRequest != nil {
r.bunchRequest.downloadAttempted()
}
if err != nil {
//TODO: insert to DB as "attempted but unsuccessful"
}
}
}
func getBlobFilePath(blobId string) string {
return path.Join(bundlePath, base64.StdEncoding.EncodeToString([]byte(blobId)))
}
func getSignedURL(client *http.Client, blobServerURL string, blobId string) (string, error) {
blobUri, err := url.Parse(blobServerURL)
if err != nil {
log.Panicf("bad url value for config %s: %s", blobUri, err)
}
blobUri.Path += strings.Replace(blobStoreUri, "{blobId}", blobId, 1)
parameters := url.Values{}
parameters.Add("action", "GET")
blobUri.RawQuery = parameters.Encode()
uri := blobUri.String()
surl, err := getUriReaderWithAuth(client, uri)
if err != nil {
log.Errorf("Unable to get signed URL from BlobServer %s: %v", uri, err)
return "", err
}
defer surl.Close()
body, err := ioutil.ReadAll(surl)
if err != nil {
log.Errorf("Invalid response from BlobServer for {%s} error: {%v}", uri, err)
return "", err
}
res := blobServerResponse{}
err = json.Unmarshal(body, &res)
if err != nil {
log.Errorf("Invalid response from BlobServer for {%s} error: {%v}", uri, err)
return "", err
}
return res.SignedUrl, nil
}
// downloadFromURI involves retrieving the signed URL for the blob, and storing the resource locally
// after downloading the resource from GCS (via the signed URL)
func downloadFromURI(client *http.Client, blobServerURL string, blobId string) (tempFileName string, err error) {
var tempFile *os.File
uri, err := getSignedURL(client, blobServerURL, blobId)
if err != nil {
log.Errorf("Unable to get signed URL for blobId {%s}, error : {%v}", blobId, err)
return
}
tempFile, err = ioutil.TempFile(bundlePath, "blob")
if err != nil {
log.Errorf("Unable to create temp file: %v", err)
return
}
defer tempFile.Close()
tempFileName = tempFile.Name()
var confReader io.ReadCloser
confReader, err = getUriReaderWithAuth(client, uri)
if err != nil {
log.Errorf("Unable to retrieve Blob %s: %v", uri, err)
return
}
defer confReader.Close()
_, err = io.Copy(tempFile, confReader)
if err != nil {
log.Errorf("Unable to write Blob %s: %v", tempFileName, err)
return
}
log.Debugf("Blob %s downloaded to: %s", uri, tempFileName)
return
}
// retrieveBundle retrieves bundle data from a URI
func getUriReaderWithAuth(client *http.Client, uriString string) (io.ReadCloser, error) {
req, err := http.NewRequest("GET", uriString, nil)
if err != nil {
return nil, err
}
// add Auth
req.Header.Add("Authorization", getBearerToken())
res, err := client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
res.Body.Close()
return nil, fmt.Errorf("GET uri %s failed with status %d", uriString, res.StatusCode)
}
return res.Body, nil
}
type BundleDownloader struct {
id int
workChan chan *DownloadRequest
bm *bundleManager
}
func (w *BundleDownloader) Start() {
go func() {
log.Debugf("started bundle downloader %d", w.id)
for req := range w.bm.downloadQueue {
log.Debugf("starting download blobId=%s", req.blobId)
err := req.downloadBlob()
if err != nil {
// timeout
if _, ok := err.(*timeoutError); ok {
continue
}
go func(r *DownloadRequest, bm *bundleManager) {
r.backoffFunc()
bm.enqueueRequest(r)
}(req, w.bm)
}
}
log.Debugf("bundle downloader %d stopped", w.id)
}()
}
// simple doubling back-off
func createBackoff(retryIn, maxBackOff time.Duration) func() {
return func() {
log.Debugf("backoff called. will retry in %s.", retryIn)
time.Sleep(retryIn)
retryIn = retryIn * time.Duration(2)
if retryIn > maxBackOff {
retryIn = maxBackOff
}
}
}
type timeoutError struct {
markFailedAt time.Time
}
func (e *timeoutError) Error() string {
return fmt.Sprintf("Timeout. markFailedAt=%v", e.markFailedAt)
}