blob: c31c2f0428f28a0e190c909db9f39956b80c8e7f [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshotserver
import (
"database/sql"
"errors"
"net"
"net/http"
"time"
"github.com/30x/goscaffold"
log "github.com/Sirupsen/logrus"
"github.com/apigee-labs/transicator/pgclient"
"github.com/julienschmidt/httprouter"
"github.com/spf13/viper"
)
const (
packageName string = "transicator"
appName string = "snapshotserver"
// Default timeout for individual Postgres transactions
defaultPGTimeout = 30 * time.Second
defaultSelectorColumn = "_change_selector"
defaultTempDir = ""
tempSnapshotPrefix = "transicatortmp"
tempSnapshotName = "snap"
maxRequestBodyLength = 1024 * 1024 // 1 MB
)
// selectorColumn is the name of the database column that distinguishes a scope
var selectorColumn = defaultSelectorColumn
// tempSnapshotDir is where we'll put temporary sqlite files
var tempSnapshotDir = defaultTempDir
// ErrUsage is returned when the user passes incorrect command-line arguments.
var ErrUsage = errors.New("Invalid arguments")
var mainDB *sql.DB
/*
Run starts the snapshot server. It will listen on an HTTP port as directed
by the Viper configuration. This method will block until either there
is an error, or the server is stopped, so a goroutine is recommended
if running this as part of a unit test.
*/
func Run() (*goscaffold.HTTPScaffold, error) {
err := getConfig()
if err != nil {
return nil, err
}
// Fetch config values from Viper
localBindIpAddr := viper.GetString("localBindIpAddr")
port := viper.GetInt("port")
securePort := viper.GetInt("securePort")
mgmtPort := viper.GetInt("mgmtPort")
pgURL := viper.GetString("pgURL")
key := viper.GetString("key")
cert := viper.GetString("cert")
debug := viper.GetBool("debug")
selectorColumn = viper.GetString("selectorColumn")
tempSnapshotDir = viper.GetString("tempdir")
if pgURL == "" {
return nil, ErrUsage
}
if port < 0 && securePort < 0 {
return nil, ErrUsage
}
if debug {
log.SetLevel(log.DebugLevel)
}
log.Infof("Connecting to Postgres DB %s\n", pgURL)
mainDB, err = sql.Open("transicator", pgURL)
if err != nil {
return nil, errors.New("Unable to initialize database connection")
}
err = mainDB.Ping()
if err != nil {
log.Warnf("Warning: Postgres DB Err: %v\n", err)
log.Warn("Continuing anyway...")
}
log.Info("Connection to Postgres succeeded.\n")
pgdriver := mainDB.Driver().(*pgclient.PgDriver)
pgdriver.SetIsolationLevel("repeatable read")
pgdriver.SetExtendedColumnNames(true)
pgdriver.SetReadTimeout(defaultPGTimeout)
router := httprouter.New()
router.GET("/scopes/:apidclusterId",
basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
GetScopes(w, r, mainDB, p)
}))
router.GET("/snapshots",
basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
GenSnapshot(w, r)
}))
router.GET("/data",
basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
DownloadSnapshot(w, r, mainDB, p)
}))
scaf := goscaffold.CreateHTTPScaffold()
ip := net.ParseIP(localBindIpAddr)
if ip != nil {
scaf.SetlocalBindIPAddressV4(ip)
}
scaf.SetInsecurePort(port)
scaf.SetSecurePort(securePort)
if mgmtPort >= 0 {
scaf.SetManagementPort(mgmtPort)
}
scaf.SetKeyFile(key)
scaf.SetCertFile(cert)
scaf.SetHealthPath("/health")
scaf.SetReadyPath("/ready")
scaf.SetHealthChecker(func() (goscaffold.HealthStatus, error) {
return checkHealth(mainDB)
})
scaf.SetMarkdown("GET", "/markdown", nil)
err = scaf.StartListen(router)
return scaf, err
}
/*
Close closes the database and does other necessary cleanup.
*/
func Close() {
if mainDB != nil {
mainDB.Close()
}
}
func basicValidationHandler(h httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
// Limit request body size to maxRequestBodyLength
r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodyLength)
h(w, r, p)
}
}
func checkHealth(db *sql.DB) (goscaffold.HealthStatus, error) {
row := db.QueryRow("select * from now()")
var now string
err := row.Scan(&now)
if err == nil {
return goscaffold.OK, nil
}
log.Warnf("Not ready: Database error: %s", err)
// Return a "Not ready" status. That means that the server should not
// receive calls, but it does not need a restart. Don't return a "failed"
// status here that would cause a restart. We will be able to reach PG
// again when it's ready.
return goscaffold.NotReady, err
}