blob: 07540cb52af22470c80d3944b9be24e62988724a [file] [log] [blame]
/*
Copyright 2016 The Transicator Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <postgres.h>
#include <catalog/pg_type.h>
#include <catalog/pg_type.h>
#include <access/xlogdefs.h>
#include <access/xlog.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <utils/rel.h>
#include <replication/output_plugin.h>
#include <replication/logical.h>
#include <transicator.h>
#include <transicator.pb-c.h>
static size_t countColumns(TupleDesc tupdesc) {
int natt;
size_t cols = 0;
for (natt = 0; natt < tupdesc->natts; natt++)
{
Form_pg_attribute attr = tupdesc->attrs[natt];
if (!(attr->attisdropped) && !(attr->attnum < 0)) {
cols++;
}
}
return cols;
}
static void setValue(Oid typ, Oid outputtyp, Datum val, Common__ValuePb* cv) {
switch (typ) {
case BOOLOID:
cv->value_case = COMMON__VALUE_PB__VALUE_BOOL;
cv->bool_ = DatumGetBool(val);
break;
case INT2OID:
cv->value_case = COMMON__VALUE_PB__VALUE_INT;
cv->int_ = DatumGetInt16(val);
break;
case INT4OID:
case OIDOID:
cv->value_case = COMMON__VALUE_PB__VALUE_INT;
cv->int_ = DatumGetInt32(val);
break;
case INT8OID:
cv->value_case = COMMON__VALUE_PB__VALUE_INT;
cv->int_ = DatumGetInt64(val);
break;
case FLOAT4OID:
cv->value_case = COMMON__VALUE_PB__VALUE_DOUBLE;
cv->double_ = DatumGetFloat4(val);
break;
case FLOAT8OID:
cv->value_case = COMMON__VALUE_PB__VALUE_DOUBLE;
cv->double_ = DatumGetFloat8(val);
break;
case BYTEAOID:
/* Length of the byte array is the length plus header */
cv->value_case = COMMON__VALUE_PB__VALUE_BYTES;
cv->bytes.len = VARSIZE(val) - 4;
cv->bytes.data = (uint8_t*)VARDATA(val);
break;
case TIMESTAMPOID:
case TIMESTAMPTZOID:
cv->value_case = COMMON__VALUE_PB__VALUE_TIMESTAMP;
cv->timestamp = DatumGetInt64(val);
default:
/* Convert everything else into a string */
cv->value_case = COMMON__VALUE_PB__VALUE_STRING;
cv->string = OidOutputFunctionCall(outputtyp, val);
break;
}
}
static void tuple_to_proto(
StringInfo s, TupleDesc tupdesc,
HeapTuple tuple, Common__ColumnPb** cols)
{
int natt;
int cp = 0;
int colnum = 0;
for (natt = 0; natt < tupdesc->natts; natt++)
{
Common__ColumnPb* col;
Form_pg_attribute attr;
bool isnull;
Datum origval;
Datum finalval;
Datum typ;
attr = tupdesc->attrs[natt];
if ((attr->attisdropped) || (attr->attnum < 0)) {
continue;
}
col = (Common__ColumnPb*)palloc(sizeof(Common__ColumnPb));
common__column_pb__init(col);
// NOTE: You cannot use natt here as the array index, as it's counting
// the number of entries in tupdesc, but we've previously computed and
// allocated the cols array based on a count that doesn't include the
// attr's that are dropped or have a negative attnum. If you use
// natt to index into cols, you risk jumping off the end of the array
// and causing a pg crash.
cols[colnum++] = col;
typ = attr->atttypid;
col->name = NameStr(attr->attname);
col->type = typ;
col->has_type = 1;
/* get Datum from tuple */
origval = heap_getattr(tuple, natt + 1, tupdesc, &isnull);
if (!isnull) {
Oid typoutput; /* output function */
bool typisvarlena;
Common__ValuePb* cv = (Common__ValuePb*)palloc(sizeof(Common__ValuePb));
common__value_pb__init(cv);
col->value = cv;
/* query output function */
getTypeOutputInfo(typ,
&typoutput, &typisvarlena);
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
/* TODO What does this mean? */
cv->value_case = COMMON__VALUE_PB__VALUE_STRING;
cv->string = "\"unchanged-toast-datum\"";
} else if (!typisvarlena) {
setValue(typ, typoutput, origval, cv);
} else {
finalval = PointerGetDatum(PG_DETOAST_DATUM(origval));
setValue(typ, typoutput, finalval, cv);
}
}
cp++;
}
}
void transicatorOutputChangeProto(
struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
Relation relation,
ReorderBufferChange *change,
DecodingState* state) {
/*
* TODO use utils/palloc.h and utils/memutils.h to create and reset
* a memory context on every plugin run rather than alloc/free.
*/
Form_pg_class class_form;
TupleDesc tupdesc;
size_t numCols;
size_t packSize;
uint8_t* pack;
Common__ChangePb pb = COMMON__CHANGE_PB__INIT;
class_form = RelationGetForm(relation);
tupdesc = RelationGetDescr(relation);
pb.table =
quote_qualified_identifier(
get_namespace_name(
get_rel_namespace(RelationGetRelid(relation))),
NameStr(class_form->relname));
pb.sequence = NULL;
pb.changesequence = change->lsn;
pb.has_changesequence = 1;
pb.commitsequence = txn->final_lsn;
pb.has_commitsequence = 1;
pb.commitindex = state->index;
pb.has_commitindex = 1;
pb.transactionidepoch = convert_xid(txn->xid);
pb.has_transactionidepoch = 1;
state->index++;
switch (change->action)
{
case REORDER_BUFFER_CHANGE_INSERT:
pb.operation = 1;
if (change->data.tp.newtuple != NULL) {
numCols = countColumns(tupdesc);
pb.n_newcolumns = numCols;
pb.newcolumns = (Common__ColumnPb**)palloc(sizeof(Common__ColumnPb*) * numCols);
tuple_to_proto(ctx->out, tupdesc, &change->data.tp.newtuple->tuple, pb.newcolumns);
}
break;
case REORDER_BUFFER_CHANGE_UPDATE:
pb.operation = 2;
numCols = countColumns(tupdesc);
if (change->data.tp.oldtuple != NULL) {
pb.n_oldcolumns = numCols;
pb.oldcolumns = (Common__ColumnPb**)palloc(sizeof(Common__ColumnPb*) * numCols);
tuple_to_proto(ctx->out, tupdesc, &change->data.tp.oldtuple->tuple, pb.oldcolumns);
}
if (change->data.tp.newtuple != NULL) {
pb.n_newcolumns = numCols;
pb.newcolumns = (Common__ColumnPb**)palloc(sizeof(Common__ColumnPb*) * numCols);
tuple_to_proto(ctx->out, tupdesc, &change->data.tp.newtuple->tuple, pb.newcolumns);
}
break;
case REORDER_BUFFER_CHANGE_DELETE:
pb.operation = 3;
if (change->data.tp.oldtuple != NULL) {
numCols = countColumns(tupdesc);
pb.n_oldcolumns = numCols;
pb.oldcolumns = (Common__ColumnPb**)palloc(sizeof(Common__ColumnPb*) * numCols);
tuple_to_proto(ctx->out, tupdesc, &change->data.tp.oldtuple->tuple, pb.oldcolumns);
}
break;
default:
Assert(false);
}
packSize = common__change_pb__get_packed_size(&pb);
pack = (uint8_t*)palloc(sizeof(uint8_t) * packSize);
common__change_pb__pack(&pb, pack);
appendBinaryStringInfo(ctx->out, (char*)pack, packSize);
}