blob: 4c48e589b2a8663c84c1b258e54073b57808e2ca [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replication
import (
"database/sql"
"fmt"
"io"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/Sirupsen/logrus"
"github.com/apigee-labs/transicator/common"
"github.com/apigee-labs/transicator/pgclient"
)
const (
// Make sure that we reply to the server at least this often
heartbeatTimeout = 10 * time.Second
// Make sure that we always reconnect eventually
maxReconnectDelay = time.Minute
initialReconnectDelay = 100 * time.Millisecond
// Retry dropping the slot because sometimes it takes a few seconds
// for it to be actually dropper
dropRetries = 10
)
type replCommand int
const (
stopCmd replCommand = iota
stopAndDropCmd
disconnectedCmd
shutdownCmd
acknowedgeCmd
)
//go:generate stringer -type State replication.go
/*
State represents whether the replicator is connected or reconnecting.
*/
type State int32
// Values of replication state
const (
Connecting State = iota
Running
Stopped
)
/*
A Replicator is a client for the logical replication protocol.
*/
type Replicator struct {
slotName string
rawConnectString string
connectString string
filter func(c *common.Change) bool
state int32
stopWaiter *sync.WaitGroup
changeChan chan *common.Change
updateChan chan uint64
cmdChan chan replCommand
jsonMode bool
}
/*
CreateReplicator creates a new Replicator that will use the specified
URL to reach Postgres, and the specified slot name. The user must
call Start in order to start it up. "connect" is a postgres URL to be
passed to the "pgclient" module.
"sn" is the name of the replication slot to read from. This slot will be
created if it does not already exist.
*/
func CreateReplicator(connect, sn string) (*Replicator, error) {
slotName := strings.ToLower(sn)
connectString := addParam(connect, "replication", "database")
log.Debugf("Connecting to the database at \"%s\"", connectString)
repl := &Replicator{
slotName: slotName,
rawConnectString: connect,
connectString: connectString,
state: int32(Stopped),
changeChan: make(chan *common.Change, 100),
updateChan: make(chan uint64, 100),
cmdChan: make(chan replCommand, 1),
}
return repl, nil
}
/*
addParam takes a URL and adds a parameter, regardless of what's already in
there.
*/
func addParam(query, key, val string) string {
// Add a value to the connect URL in such a way that we are flexible
var err error
var queryVals url.Values
cs := strings.SplitN(query, "?", 2)
if len(cs) == 1 {
queryVals = url.Values(make(map[string][]string))
} else if len(cs) == 2 {
queryVals, err = url.ParseQuery(cs[1])
if err != nil {
return query
}
} else {
panic(fmt.Sprintf("Invalid string splitting of \"%s\"", query))
}
queryVals.Set(key, val)
return fmt.Sprintf("%s?%s", cs[0], queryVals.Encode())
}
/*
SetChangeFilter supplies a function that will be called before every change
is passed on to the channel. This makes it easier to write clients,
especially for tests. The specified filter function will run inside a
critical goroutine, so it must make its own decision without blocking.
A typical use case would be to look for a particular value of a field.
*/
func (r *Replicator) SetChangeFilter(f func(*common.Change) bool) {
r.filter = f
}
/*
Start replication. Start will succeed even if the database cannot be
reached.
*/
func (r *Replicator) Start() {
// The main loop will handle connecting and all events.
if r.State() == Stopped {
r.setState(Connecting)
r.stopWaiter = &sync.WaitGroup{}
r.stopWaiter.Add(1)
go r.replLoop()
}
}
/*
DropSlot deletes the logical replication slot created by "Start".
"connect" is a postgres URL to be passed to the "pgclient" module.
"sn" is the name of the replication slot to drop.
*/
func DropSlot(connect, sn string) error {
slotName := strings.ToLower(sn)
ddb, err := sql.Open("transicator", connect)
if err != nil {
return err
}
defer ddb.Close()
_, err = ddb.Exec("select * from pg_drop_replication_slot($1)", slotName)
return err
}
/*
Changes returns a channel that can be used to wait for changes. If an error
is returned, then no more changes will be forthcoming. There is only one
channel per Replicator -- any kind of "broadcast" needs to be handled by
the client.
*/
func (r *Replicator) Changes() <-chan *common.Change {
return r.changeChan
}
/*
Stop stops the replication process and closes the channel. It does not remove
the replication slot -- for that, use "DropSlot" after the channel is closed.
This method does not return until replication has been stopped.
*/
func (r *Replicator) Stop() {
if r.State() != Stopped {
r.cmdChan <- stopCmd
r.stopWaiter.Wait()
}
}
/*
StopAndDrop stops the replication process and closes the channel, and then
removes the replication slot.
This method does not return until replication has been stopped.
*/
func (r *Replicator) StopAndDrop() {
if r.State() != Stopped {
r.cmdChan <- stopAndDropCmd
r.stopWaiter.Wait()
}
}
/*
State returns the current state of the replication.
*/
func (r *Replicator) State() State {
return State(atomic.LoadInt32(&r.state))
}
/*
setState atomically updates the state
*/
func (r *Replicator) setState(s State) {
atomic.StoreInt32(&r.state, int32(s))
}
/*
Acknowledge acknowledges to the server that we have committed a change, and
will result in a message being sent back to the database to the same
effect. It is important to periodically acknowledge changes so that the
database does not have to maintain its transaction log forever.
However, changes that happened before the specified LSN might still be
delivered on a reconnect, so it is important that consumers of this class
be prepared to handle and ignore duplicates.
*/
func (r *Replicator) Acknowledge(lsn uint64) {
r.updateChan <- lsn
}
/*
connect to the database and either get replication started, or
return an error.
*/
func (r *Replicator) connect() (*pgclient.PgConnection, error) {
log.Debugf("Replication connecting to Postgres using %s", r.connectString)
success := false
conn, err := pgclient.Connect(r.connectString)
if err != nil {
return nil, err
}
defer func() {
if !success {
conn.Close()
}
}()
slotCreated := false
startMsg := pgclient.NewOutputMessage(pgclient.Query)
startSQL :=
fmt.Sprintf("START_REPLICATION SLOT %s LOGICAL 0/0 (protobuf)", r.slotName)
log.Debugf("Sending SQL to start replication: %s\n", startSQL)
startMsg.WriteString(startSQL)
err = conn.WriteMessage(startMsg)
if err != nil {
return nil, err
}
// Read until we get a CopyBothResponse
for {
m, err := conn.ReadMessage()
if err != nil {
return nil, err
}
switch m.Type() {
case pgclient.ErrorResponse:
if !slotCreated {
// First error might because slot is not created, so create it.
consumeTillReady(conn)
log.Debugf("Creating new replication slot %s", r.slotName)
_, _, err = conn.SimpleQuery(fmt.Sprintf(
"CREATE_REPLICATION_SLOT %s LOGICAL transicator_output", r.slotName))
if err != nil {
return nil, err
}
slotCreated = true
// Re-send start replication command.
log.Debugf("Re-starting replication with %s", startSQL)
err = conn.WriteMessage(startMsg)
if err != nil {
return nil, err
}
} else {
return nil, pgclient.ParseError(m)
}
case pgclient.NoticeResponse:
msg, _ := pgclient.ParseNotice(m)
log.Infof("Info from server: %s", msg)
case pgclient.CopyBothResponse:
// We'll use this as a signal to move on
parseCopyBoth(m)
success = true
return conn, nil
default:
return nil, fmt.Errorf("Unknown message from server: %s", m.Type())
}
}
}
/*
This is the main loop. It handles connecting and reconnecting, and then
receives commands from the client and passes them on as appropriate.
*/
func (r *Replicator) replLoop() {
var highLSN uint64
var connected bool
var stopping bool
var dropping bool
var connection *pgclient.PgConnection
var err error
connectDelay := initialReconnectDelay
hbTimer := time.NewTimer(heartbeatTimeout)
defer hbTimer.Stop()
// First time through -- connect right away
log.Debug("Starting replLoop")
connectTimer := time.NewTimer(0)
defer connectTimer.Stop()
for {
select {
// We were not connected, so attempt a reconnect.
case <-connectTimer.C:
connection, err = r.connect()
if err == nil {
connected = true
r.setState(Running)
log.Infof("Connected to Postgres using replication slot \"%s\"", r.slotName)
go r.readLoop(connection)
} else {
connectDelay *= 2
if connectDelay > maxReconnectDelay {
connectDelay = maxReconnectDelay
}
log.Warningf("Error connecting to Postgres. Retrying in %v: %s",
connectDelay, err)
connectTimer.Reset(connectDelay)
}
// Client called "Acknowledge" to ask us to update the high LSN
case newLSN := <-r.updateChan:
log.Debugf("Got updated LSN %d", newLSN)
if newLSN > highLSN {
highLSN = newLSN
// Send an update to the server
if connected {
r.updateLSN(highLSN, connection)
}
hbTimer.Reset(heartbeatTimeout)
}
// Periodic timeout to keep replication connection alive.
case <-hbTimer.C:
log.Debug("Heartbeat timer expired")
if connected {
r.updateLSN(highLSN, connection)
hbTimer.Reset(heartbeatTimeout)
}
case cmd := <-r.cmdChan:
log.Debugf("Got command %d", cmd)
switch cmd {
// Read loop exiting due to connection failure.
case disconnectedCmd:
connected = false
if stopping {
// But it was on purpose...
r.finishStop(dropping, connection)
return
}
log.Warning("Disconnected from Postgres.")
r.setState(Connecting)
connection.Close()
connectDelay = initialReconnectDelay
connectTimer.Reset(connectDelay)
// Server asked us to acknowledge right now
case acknowedgeCmd:
if connected {
r.updateLSN(highLSN, connection)
}
// We think that the server wants us to disconnect
case shutdownCmd:
if connected {
cd := pgclient.NewOutputMessage(pgclient.CopyDoneOut)
connection.WriteMessage(cd)
r.cmdChan <- disconnectedCmd
}
// Client called stop with "delete" flag
case stopAndDropCmd:
if connected {
dropping = true
stopping = true
connection.Close()
} else {
r.finishStop(true, connection)
return
}
// Client called Stop
case stopCmd:
if connected {
stopping = true
log.Infof("Closing connection to Postgres")
connection.Close()
} else {
r.finishStop(false, connection)
return
}
}
}
}
}
func (r *Replicator) finishStop(
deleteSlot bool,
connection *pgclient.PgConnection) {
r.setState(Stopped)
log.Infof("Stopped replicating from slot \"%s\"", r.slotName)
if deleteSlot {
log.Infof("Dropping replication slot \"%s\"", r.slotName)
err := r.dropSlot()
if err != nil {
log.Warnf("Error dropping replication slot \"%s\": %s", r.slotName, err)
}
}
r.stopWaiter.Done()
}
/*
dropSlot drops the replication slot using the replication protocol, in
the unlikely event that we are not able to use any other protocol.
This requires us to open a new connection in replication mode.
Since we do this right after closing the connection, give it a few
retries.
*/
func (r *Replicator) dropSlot() error {
conn, err := pgclient.Connect(r.connectString)
if err != nil {
return err
}
defer conn.Close()
for try := 0; try < dropRetries; try++ {
_, err = conn.SimpleExec(fmt.Sprintf(
"DROP_REPLICATION_SLOT %s", r.slotName))
if err == nil {
return nil
}
time.Sleep(time.Second)
}
return err
}
/*
This is the goroutine that is responsible for reading the replication output
from the database and passing it along to clients.
*/
func (r *Replicator) readLoop(connection *pgclient.PgConnection) {
log.Debug("Starting to read from replication connection")
for {
m, err := connection.ReadMessage()
if err != nil {
if err != io.EOF {
log.Warningf("Error reading from server: %s", err)
}
errChange := &common.Change{
Error: err,
}
r.changeChan <- errChange
r.cmdChan <- disconnectedCmd
return
}
log.Debugf("Received message type %s", m.Type())
switch m.Type() {
case pgclient.NoticeResponse:
msg, _ := pgclient.ParseNotice(m)
log.Infof("Info from server: %s", msg)
case pgclient.ErrorResponse:
err = pgclient.ParseError(m)
log.Warningf("Server returned an error: %s", err)
errChange := &common.Change{
Error: err,
}
r.changeChan <- errChange
r.cmdChan <- disconnectedCmd
return
case pgclient.CopyData:
cm, err := pgclient.ParseCopyData(m)
if err != nil {
log.Warningf("Received invalid CopyData message: %s", err)
} else {
shouldExit := r.handleCopyData(cm)
if shouldExit {
r.cmdChan <- shutdownCmd
return
}
}
default:
log.Warningf("Server received an unknown message %s", m.Type())
}
}
}
func parseCopyBoth(m *pgclient.InputMessage) {
isBinary, _ := m.ReadInt8()
log.Debugf("Is binary = %d", isBinary)
nfi, _ := m.ReadInt16()
numFields := int(nfi)
for i := 0; i < numFields; i++ {
isBinary, _ := m.ReadInt16()
log.Debugf("Column %d: binary = %d", i, isBinary)
}
}
func (r *Replicator) handleCopyData(m *pgclient.InputMessage) bool {
switch m.Type() {
case pgclient.WALData:
r.handleWALData(m)
return false
case pgclient.SenderKeepalive:
return r.handleKeepalive(m)
default:
log.Warningf("Received unknown WAL message %s", m.Type())
return false
}
}
func (r *Replicator) handleWALData(m *pgclient.InputMessage) {
m.ReadInt64() // StartWAL
m.ReadInt64() // end WAL
m.ReadInt64() // Timestamp
buf := m.ReadRemaining()
var c *common.Change
var err error
if r.jsonMode {
c, err = common.UnmarshalChange(buf)
} else {
c, err = common.UnmarshalChangeProto(buf)
if err != nil {
// Defensive code in case we have an old version of the output plugin
// that does not understand the "protobuf" option.
c, err = common.UnmarshalChange(buf)
if err == nil {
log.Warn("Error decoding protobuf -- looks like Postgres is sending JSON")
r.jsonMode = true
}
}
}
if err == nil {
if r.filter == nil || r.filter(c) {
r.changeChan <- c
}
} else {
log.Warningf("Received invalid change %s: %s", string(buf), err)
}
}
func (r *Replicator) handleKeepalive(m *pgclient.InputMessage) bool {
m.ReadInt64() // end WAL
m.ReadInt64() // Timestamp
replyNow, _ := m.ReadByte()
log.Debugf("Got heartbeat. Reply now = %d", replyNow)
if replyNow != 0 {
// Postgres 9.5 does this on a graceful shutdown, and never exits unless
// we use this as a trigger to stop replication and exit.
// That is not what the documentation says -- the documentation says that
// we should just send a heartbeat right away. But if we do that, then
// we end up in a heartbeat loop and the database instance never exits.
log.Info("Database requested immediate heartbeat response. Using this to trigger shutdown.")
return true
}
return false
}
func (r *Replicator) updateLSN(highLSN uint64, conn *pgclient.PgConnection) {
log.Debugf("Updating server with last LSN %d", highLSN)
om := pgclient.NewOutputMessage(pgclient.CopyDataOut)
om.WriteByte(byte(pgclient.StandbyStatusUpdate))
om.WriteUint64(highLSN + 1) // last written to disk+1
om.WriteUint64(highLSN + 1) // last flushed to disk+1
om.WriteUint64(highLSN + 1) // last applied+1
om.WriteInt64(common.TimeToPgTimestamp(time.Now())) // Timestamp, in microseconds
om.WriteByte(0)
conn.WriteMessage(om)
}
func consumeTillReady(c *pgclient.PgConnection) error {
for {
m, err := c.ReadMessage()
if err != nil {
return err
}
if m.Type() == pgclient.ReadyForQuery {
return nil
}
}
}