blob: fd0a619cffe553c7cf353439a01b166a9a73165a [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"database/sql"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"testing"
"github.com/30x/goscaffold"
"github.com/Sirupsen/logrus"
"github.com/apigee-labs/transicator/common"
"github.com/apigee-labs/transicator/pgclient"
"github.com/apigee-labs/transicator/replication"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/reporters"
. "github.com/onsi/gomega"
)
const (
debugTests = false
replicationSlot = "change_server_test"
testDataDir = "./changeservertestdata"
)
var dbURL string
var baseURL string
var listener net.Listener
var testServer *server
var db *sql.DB
var insertStmt *sql.Stmt
var insertAlterativeSelectorStmt *sql.Stmt
func TestServer(t *testing.T) {
if debugTests {
logrus.SetLevel(logrus.DebugLevel)
}
dbURL = os.Getenv("TEST_PG_URL")
if dbURL == "" {
fmt.Println("Skipping tests because TEST_PG_URL not set")
fmt.Println("Format:")
fmt.Println(" postgres://user:password@host:port/database")
} else {
RegisterFailHandler(Fail)
junitReporter := reporters.NewJUnitReporter("../test-reports/changeserver.xml")
RunSpecsWithDefaultAndCustomReporters(t, "Changeserver suite", []Reporter{junitReporter})
}
}
var _ = BeforeSuite(func() {
var err error
db, err = sql.Open("transicator", dbURL)
Expect(err).Should(Succeed())
db.Driver().(*pgclient.PgDriver).SetIsolationLevel("repeatable read")
// Connect to the database and create our test table
executeSQL(testTableSQL)
executeSQL(testAlternateScopeTableSQL)
insertStmt, err = db.Prepare(
"insert into changeserver_test (sequence, _change_selector) values ($1, $2)")
Expect(err).Should(Succeed())
insertAlterativeSelectorStmt, err = db.Prepare("insert into changeserver_scope_test(sequence, _different_selector) values ($1, $2)")
Expect(err).Should(Succeed())
// Start the server, which will be ready to respond to API calls
// and which will also start replication with the database
mux := http.NewServeMux()
testServer, err = createChangeServer(mux, testDataDir, dbURL, replicationSlot, "")
Expect(err).Should(Succeed())
// For the tests, filter out changes for tables not part of these unit
// tests. Otherwise when we run many test suites in succession this suite
// fails due to the spurious changes.
testServer.repl.SetChangeFilter(func(c *common.Change) bool {
return c.Table == "public.changeserver_test"
})
testServer.start()
// Listen on an anonymous port
scaf := goscaffold.CreateHTTPScaffold()
ip := net.ParseIP("127.0.0.1")
scaf.SetlocalBindIPAddressV4(ip)
err = scaf.Open()
Expect(err).Should(Succeed())
baseURL = fmt.Sprintf("http://%s", scaf.InsecureAddress())
fmt.Fprintf(GinkgoWriter, "Base API URL: %s\n", baseURL)
// Poll until we are connected to PG and replication has begun
Eventually(func() replication.State {
return testServer.repl.State()
}, 30, 10).Should(Equal(replication.Running))
// Start listening for HTTP calls
go func() {
scaf.Listen(mux)
}()
})
var _ = AfterSuite(func() {
if listener != nil {
listener.Close()
}
if testServer != nil {
testServer.stop()
}
replication.DropSlot(dbURL, replicationSlot)
if insertStmt != nil {
insertStmt.Close()
}
// Drop the test data table
executeSQL(dropTableSQL)
executeSQL(dropAlternativeScopeTableSQL)
// And delete the storage
testServer.delete()
if db != nil {
db.Close()
}
})
func executeSQL(sql string) {
fmt.Fprintf(GinkgoWriter, "Executing SQL: \"%s\"\n", sql)
_, err := db.Exec(sql)
Expect(err).Should(Succeed())
}
func executeGet(url string) []byte {
resp, err := http.Get(url)
Expect(err).Should(Succeed())
defer resp.Body.Close()
bod, err := ioutil.ReadAll(resp.Body)
Expect(err).Should(Succeed())
return bod
}
const testTableSQL = `
create table changeserver_test (
sequence integer primary key,
_change_selector varchar
)`
const testAlternateScopeTableSQL = `
create table changeserver_scope_test (
sequence integer primary key,
_different_selector varchar
)
`
const dropTableSQL = "drop table changeserver_test"
const dropAlternativeScopeTableSQL = "drop table changeserver_scope_test"