| /* |
| Copyright 2016 The Transicator Authors |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| package snapshotserver |
| |
| import ( |
| "database/sql" |
| "errors" |
| "net" |
| "net/http" |
| "time" |
| |
| "github.com/30x/goscaffold" |
| log "github.com/Sirupsen/logrus" |
| "github.com/apigee-labs/transicator/pgclient" |
| "github.com/julienschmidt/httprouter" |
| "github.com/spf13/viper" |
| ) |
| |
| const ( |
| packageName string = "transicator" |
| appName string = "snapshotserver" |
| // Default timeout for individual Postgres transactions |
| defaultPGTimeout = 30 * time.Second |
| defaultSelectorColumn = "_change_selector" |
| defaultTempDir = "" |
| tempSnapshotPrefix = "transicatortmp" |
| tempSnapshotName = "snap" |
| maxRequestBodyLength = 1024 * 1024 // 1 MB |
| ) |
| |
| // selectorColumn is the name of the database column that distinguishes a scope |
| var selectorColumn = defaultSelectorColumn |
| |
| // tempSnapshotDir is where we'll put temporary sqlite files |
| var tempSnapshotDir = defaultTempDir |
| |
| // ErrUsage is returned when the user passes incorrect command-line arguments. |
| var ErrUsage = errors.New("Invalid arguments") |
| |
| var mainDB *sql.DB |
| |
| /* |
| Run starts the snapshot server. It will listen on an HTTP port as directed |
| by the Viper configuration. This method will block until either there |
| is an error, or the server is stopped, so a goroutine is recommended |
| if running this as part of a unit test. |
| */ |
| func Run() (*goscaffold.HTTPScaffold, error) { |
| err := getConfig() |
| if err != nil { |
| return nil, err |
| } |
| |
| // Fetch config values from Viper |
| localBindIpAddr := viper.GetString("localBindIpAddr") |
| port := viper.GetInt("port") |
| securePort := viper.GetInt("securePort") |
| mgmtPort := viper.GetInt("mgmtPort") |
| |
| pgURL := viper.GetString("pgURL") |
| key := viper.GetString("key") |
| cert := viper.GetString("cert") |
| |
| debug := viper.GetBool("debug") |
| selectorColumn = viper.GetString("selectorColumn") |
| tempSnapshotDir = viper.GetString("tempdir") |
| |
| if pgURL == "" { |
| return nil, ErrUsage |
| } |
| if port < 0 && securePort < 0 { |
| return nil, ErrUsage |
| } |
| |
| if debug { |
| log.SetLevel(log.DebugLevel) |
| } |
| |
| log.Infof("Connecting to Postgres DB %s\n", pgURL) |
| mainDB, err = sql.Open("transicator", pgURL) |
| if err != nil { |
| return nil, errors.New("Unable to initialize database connection") |
| } |
| err = mainDB.Ping() |
| if err != nil { |
| log.Warnf("Warning: Postgres DB Err: %v\n", err) |
| log.Warn("Continuing anyway...") |
| } |
| |
| log.Info("Connection to Postgres succeeded.\n") |
| pgdriver := mainDB.Driver().(*pgclient.PgDriver) |
| pgdriver.SetIsolationLevel("repeatable read") |
| pgdriver.SetExtendedColumnNames(true) |
| pgdriver.SetReadTimeout(defaultPGTimeout) |
| router := httprouter.New() |
| |
| router.GET("/scopes/:apidclusterId", |
| basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| GetScopes(w, r, mainDB, p) |
| })) |
| |
| router.GET("/snapshots", |
| basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| GenSnapshot(w, r) |
| })) |
| |
| router.GET("/data", |
| basicValidationHandler(func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| DownloadSnapshot(w, r, mainDB, p) |
| })) |
| |
| scaf := goscaffold.CreateHTTPScaffold() |
| ip := net.ParseIP(localBindIpAddr) |
| if ip != nil { |
| scaf.SetlocalBindIPAddressV4(ip) |
| } |
| scaf.SetInsecurePort(port) |
| scaf.SetSecurePort(securePort) |
| if mgmtPort >= 0 { |
| scaf.SetManagementPort(mgmtPort) |
| } |
| scaf.SetKeyFile(key) |
| scaf.SetCertFile(cert) |
| scaf.SetHealthPath("/health") |
| scaf.SetReadyPath("/ready") |
| scaf.SetHealthChecker(func() (goscaffold.HealthStatus, error) { |
| return checkHealth(mainDB) |
| }) |
| scaf.SetMarkdown("GET", "/markdown", nil) |
| |
| err = scaf.StartListen(router) |
| return scaf, err |
| } |
| |
| /* |
| Close closes the database and does other necessary cleanup. |
| */ |
| func Close() { |
| if mainDB != nil { |
| mainDB.Close() |
| } |
| } |
| |
| func basicValidationHandler(h httprouter.Handle) httprouter.Handle { |
| return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { |
| // Limit request body size to maxRequestBodyLength |
| r.Body = http.MaxBytesReader(w, r.Body, maxRequestBodyLength) |
| h(w, r, p) |
| } |
| } |
| |
| func checkHealth(db *sql.DB) (goscaffold.HealthStatus, error) { |
| row := db.QueryRow("select * from now()") |
| var now string |
| err := row.Scan(&now) |
| if err == nil { |
| return goscaffold.OK, nil |
| } |
| |
| log.Warnf("Not ready: Database error: %s", err) |
| // Return a "Not ready" status. That means that the server should not |
| // receive calls, but it does not need a restart. Don't return a "failed" |
| // status here that would cause a restart. We will be able to reach PG |
| // again when it's ready. |
| return goscaffold.NotReady, err |
| } |