DBZ-258 Support for wal2json plugin

This commit is contained in:
Jiri Pechanec 2017-09-15 06:20:54 +02:00 committed by Gunnar Morling
parent 003d8387ec
commit 0bc8129961
31 changed files with 1200 additions and 237 deletions

View File

@ -196,6 +196,7 @@
<database.user>${postgres.user}</database.user>
<database.password>${postgres.password}</database.password>
<database.dbname>${postgres.db.name}</database.dbname>
<plugin.name>${decoder.plugin.name}</plugin.name>
<skipLongRunningTests>${skipLongRunningTests}</skipLongRunningTests>
</systemPropertyVariables>
</configuration>
@ -280,5 +281,11 @@
<docker.skip>true</docker.skip>
</properties>
</profile>
<profile>
<id>wal2json-decoder</id>
<properties>
<decoder.plugin.name>wal2json</decoder.plugin.name>
</properties>
</profile>
</profiles>
</project>

View File

@ -24,14 +24,14 @@ public final class PgOid extends Oid {
/**
* A list of PG types not known by the JDBC driver atm.
*/
protected static final int JSONB_JDBC_OID = 1111;
public static final int JSONB_JDBC_OID = 1111;
/**
* Internal PG types as returned by the plugin
*/
protected static final int JSONB_OID = 3802;
public static final int JSONB_OID = 3802;
protected static final int TSTZRANGE_OID = 3910;
public static final int TSTZRANGE_OID = 3910;
private PgOid() {
}

View File

@ -16,12 +16,16 @@
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.common.config.ConfigValue;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
import io.debezium.connector.postgresql.connection.wal2json.WAL2JSONMessageDecoder;
import io.debezium.jdbc.JdbcConfiguration;
/**
@ -350,8 +354,16 @@ public static TopicSelectionStrategy parse(String value) {
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDefault(ReplicationConnection.Builder.DEFAULT_PLUGIN_NAME)
.withDescription("The name of the Postgres logical decoding plugin installed on the server. Defaults to 'decoderbufs'");
.withDefault(ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME)
.withDescription("The name of the Postgres logical decoding plugin installed on the server. Defaults to '"+ ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME + "'");
public static final Field PLUGIN_DECODING_CLASS = Field.create("plugin.decoding.class")
.withDisplayName("Plugin decoder class")
.withType(Type.CLASS)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("The Java class used to decode events coming from logical decoder plugin." +
"If not provided then default class associated with requested plugin is ued.");
public static final Field SLOT_NAME = Field.create("slot.name")
.withDisplayName("Slot")
@ -648,7 +660,7 @@ public static TopicSelectionStrategy parse(String value) {
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP,
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, PLUGIN_DECODING_CLASS, SLOT_NAME, DROP_SLOT_ON_STOP,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, SERVER_NAME,
TOPIC_SELECTION_STRATEGY, MAX_BATCH_SIZE,
MAX_QUEUE_SIZE, POLL_INTERVAL_MS, SCHEMA_WHITELIST,
@ -696,6 +708,27 @@ protected String pluginName() {
return config.getString(PLUGIN_NAME);
}
protected MessageDecoder pluginDecoder() {
final MessageDecoder decoder = config.getInstance(PLUGIN_DECODING_CLASS, MessageDecoder.class);
return decoder != null ? decoder : createDefaultMessageDecoder(pluginName());
}
/**
* Provides default replication message decoder instance for a given plugin name
*
* @param pluginName
* @return an instance of decoder
*/
public static MessageDecoder createDefaultMessageDecoder(final String pluginName) {
switch (pluginName) {
case ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME:
return new PgProtoMessageDecoder();
case ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME:
return new WAL2JSONMessageDecoder();
}
throw new ConnectException("Undefined default plugin decoding class for decoder plugin '" + pluginName + "'");
}
protected String slotName() {
return config.getString(SLOT_NAME);
}
@ -792,7 +825,7 @@ protected boolean initialOnlySnapshot() {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, PLUGIN_DECODING_CLASS, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
USER, PASSWORD, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,

View File

@ -75,6 +75,7 @@ protected ReplicationConnection createReplicationConnection() throws SQLExceptio
return ReplicationConnection.builder(config.jdbcConfig())
.withSlot(config.slotName())
.withPlugin(config.pluginName())
.replicationMessageDecoder(config.pluginDecoder())
.dropSlotOnClose(config.dropSlotOnStop())
.statusUpdateIntervalMillis(config.statusUpdateIntervalMillis())
.build();

View File

@ -7,33 +7,26 @@
package io.debezium.connector.postgresql;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.jdbc.PgConnection;
import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.data.Envelope;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -58,6 +51,11 @@ public class RecordsStreamProducer extends RecordsProducer {
private final AtomicReference<ReplicationStream> replicationStream;
private PgConnection typeResolverConnection = null;
@FunctionalInterface
public static interface PgConnectionSupplier {
PgConnection get() throws SQLException;
}
/**
* Creates new producer instance for the given task context
*
@ -109,8 +107,10 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
while (!Thread.currentThread().isInterrupted()) {
try {
// this will block until a message is available
PgProto.RowMessage message = stream.read();
process(message, stream.lastReceivedLSN(), consumer);
List<ReplicationMessage> messages = stream.read();
for (final ReplicationMessage message: messages) {
process(message, stream.lastReceivedLSN(), consumer);
}
} catch (SQLException e) {
Throwable cause = e.getCause();
if (cause != null && (cause instanceof IOException)) {
@ -202,7 +202,7 @@ private void closeConnections() {
}
}
private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord> consumer) throws SQLException {
private void process(ReplicationMessage message, Long lsn, Consumer<SourceRecord> consumer) throws SQLException {
if (message == null) {
// in some cases we can get null if PG gives us back a message earlier than the latest reported flushed LSN
return;
@ -227,7 +227,7 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
logger.warn("ignoring message for table '{}' because it does not have a primary key defined", tableId);
}
PgProto.Op operation = message.getOp();
ReplicationMessage.Operation operation = message.getOperation();
switch (operation) {
case INSERT: {
Object[] row = columnValues(message.getNewTupleList(), tableId, true);
@ -235,8 +235,8 @@ private void process(PgProto.RowMessage message, Long lsn, Consumer<SourceRecord
break;
}
case UPDATE: {
Object[] oldRow = columnValues(message.getOldTupleList(), tableId, true);
Object[] newRow = columnValues(message.getNewTupleList(), tableId, true);
Object[] oldRow = columnValues(message.getOldTupleList(), tableId, true);
generateUpdateRecord(tableId, oldRow, newRow, consumer);
break;
}
@ -374,7 +374,7 @@ record = new SourceRecord(partition, offset, topicName, null, keySchema, key, nu
recordConsumer.accept(record);
}
private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId tableId, boolean refreshSchemaIfChanged)
private Object[] columnValues(List<ReplicationMessage.Column> messageList, TableId tableId, boolean refreshSchemaIfChanged)
throws SQLException {
if (messageList == null || messageList.isEmpty()) {
return null;
@ -392,18 +392,19 @@ private Object[] columnValues(List<PgProto.DatumMessage> messageList, TableId ta
// based on the schema columns, create the values on the same position as the columns
List<String> columnNames = table.columnNames();
Object[] values = new Object[messageList.size()];
// JSON does not deliver a list of all columns for REPLICA IDENTITY DEFAULT
Object[] values = new Object[messageList.size() < columnNames.size() ? columnNames.size() : messageList.size()];
messageList.forEach(message -> {
//DBZ-298 Quoted column names will be sent like that in messages, but stored unquoted in the column names
String columnName = Strings.unquoteIdentifierPart(message.getColumnName());
String columnName = Strings.unquoteIdentifierPart(message.getName());
int position = columnNames.indexOf(columnName);
assert position >= 0;
values[position] = extractValueFromMessage(message);
values[position] = message.getValue(this::typeResolverConnection);
});
return values;
}
private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table table) {
private boolean schemaChanged(List<ReplicationMessage.Column> messageList, Table table) {
List<String> columnNames = table.columnNames();
int messagesCount = messageList.size();
if (columnNames.size() != messagesCount) {
@ -415,14 +416,14 @@ private boolean schemaChanged(List<PgProto.DatumMessage> messageList, Table tabl
// go through the list of columns from the message to figure out if any of them are new or have changed their type based
// on what we have in the table metadata....
return messageList.stream().filter(message -> {
String columnName = message.getColumnName();
String columnName = message.getName();
Column column = table.columnWithName(columnName);
if (column == null) {
logger.debug("found new column '{}' present in the server message which is not part of the table metadata; refreshing table schema", columnName);
return true;
} else if (!schema().isType(column.typeName(), column.jdbcType())) {
logger.debug("detected new type for column '{}', old type was '{}', new type is '{}'; refreshing table schema", columnName, column.jdbcType(),
message.getColumnType());
message.getType());
return true;
}
return false;
@ -454,132 +455,6 @@ private TableSchema tableSchemaFor(TableId tableId) throws SQLException {
}
}
/**
* Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the
* {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects.
*
* Note that the logic here is tightly coupled (i.e. dependent) on the Postgres plugin logic which writes the actual
* Protobuf messages.
*
* @param datumMessage a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage} instance; never {@code null}
* @return the value; may be null
*/
protected Object extractValueFromMessage(PgProto.DatumMessage datumMessage) {
int columnType = (int) datumMessage.getColumnType();
switch (columnType) {
case PgOid.BOOL:
return datumMessage.hasDatumBool() ? datumMessage.getDatumBool() : null;
case PgOid.INT2:
case PgOid.INT4:
return datumMessage.hasDatumInt32() ? datumMessage.getDatumInt32() : null;
case PgOid.INT8:
case PgOid.OID:
case PgOid.MONEY:
return datumMessage.hasDatumInt64() ? datumMessage.getDatumInt64() : null;
case PgOid.FLOAT4:
return datumMessage.hasDatumFloat()? datumMessage.getDatumFloat() : null;
case PgOid.FLOAT8:
case PgOid.NUMERIC:
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;
case PgOid.CHAR:
case PgOid.VARCHAR:
case PgOid.BPCHAR:
case PgOid.TEXT:
case PgOid.JSON:
case PgOid.JSONB_OID:
case PgOid.XML:
case PgOid.UUID:
case PgOid.BIT:
case PgOid.VARBIT:
return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;
case PgOid.DATE:
return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null;
case PgOid.TIMESTAMP:
case PgOid.TIMESTAMPTZ:
case PgOid.TIME:
if (!datumMessage.hasDatumInt64()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
// but we'll convert them to nanos which is the smallest unit
return TimeUnit.NANOSECONDS.convert(datumMessage.getDatumInt64(), TimeUnit.MICROSECONDS);
case PgOid.TIMETZ:
if (!datumMessage.hasDatumDouble()) {
return null;
}
// the value is sent as a double microseconds, convert to nano
return BigDecimal.valueOf(datumMessage.getDatumDouble() * 1000).longValue();
case PgOid.INTERVAL:
// these are sent as doubles by the plugin since their storage is larger than 8 bytes
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;
// the plugin will send back a TZ formatted string
case PgOid.BYTEA:
return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
case PgOid.POINT: {
PgProto.Point datumPoint = datumMessage.getDatumPoint();
return new PGpoint(datumPoint.getX(), datumPoint.getY());
}
case PgOid.TSTZRANGE_OID:
return datumMessage.hasDatumBytes() ? new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8")) : null;
case PgOid.INT2_ARRAY:
case PgOid.INT4_ARRAY:
case PgOid.INT8_ARRAY:
case PgOid.TEXT_ARRAY:
case PgOid.NUMERIC_ARRAY:
case PgOid.FLOAT4_ARRAY:
case PgOid.FLOAT8_ARRAY:
case PgOid.BOOL_ARRAY:
case PgOid.DATE_ARRAY:
case PgOid.TIME_ARRAY:
case PgOid.TIMETZ_ARRAY:
case PgOid.TIMESTAMP_ARRAY:
case PgOid.TIMESTAMPTZ_ARRAY:
case PgOid.BYTEA_ARRAY:
case PgOid.VARCHAR_ARRAY:
case PgOid.OID_ARRAY:
case PgOid.BPCHAR_ARRAY:
case PgOid.MONEY_ARRAY:
case PgOid.NAME_ARRAY:
case PgOid.INTERVAL_ARRAY:
case PgOid.CHAR_ARRAY:
case PgOid.VARBIT_ARRAY:
case PgOid.UUID_ARRAY:
case PgOid.XML_ARRAY:
case PgOid.POINT_ARRAY:
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
case PgOid.REF_CURSOR_ARRAY:
// Currently the logical decoding plugin sends unhandled types as a byte array containing the string
// representation (in Postgres) of the array value.
// The approach to decode this is sub-optimal but the only way to improve this is to update the plugin.
// Reasons for it being sub-optimal include:
// 1. It requires a Postgres JDBC connection to deserialize
// 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will
// be in most cases)
// 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string
// representations over the wire.
try {
byte[] data = datumMessage.hasDatumBytes()? datumMessage.getDatumBytes().toByteArray() : null;
if (data == null) return null;
String dataString = new String(data, Charset.forName("UTF-8"));
PgArray arrayData = new PgArray(typeResolverConnection(), columnType, dataString);
Object deserializedArray = arrayData.getArray();
return Arrays.asList((Object[])deserializedArray);
}
catch (SQLException e) {
logger.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
}
return null;
default: {
logger.warn("processing column '{}' with unknown data type '{}' as byte array", datumMessage.getColumnName(),
datumMessage.getColumnType());
return datumMessage.hasDatumBytes()? datumMessage.getDatumBytes().toByteArray() : null;
}
}
}
private synchronized PgConnection typeResolverConnection() throws SQLException {
if (typeResolverConnection == null) {
typeResolverConnection = (PgConnection)taskContext.createConnection().connection();

View File

@ -0,0 +1,32 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection;
import java.nio.ByteBuffer;
import java.util.List;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
/**
* A class that is able to deserialize/decode binary representation of a batch of replication messages generated by
* logical decoding plugin. There is usually one implementation per decoding plugin.
*
* @author Jiri Pechanec
*
*/
public interface MessageDecoder {
/**
* Deserializes binary representation of replication message into a logical Java class implementation
*
* @param buffer - binary representation of replication message
* @return a List of Java classes encapsulating decoded message
*/
List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer);
ChainedLogicalStreamBuilder options(final ChainedLogicalStreamBuilder builder);
}

View File

@ -10,7 +10,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -21,10 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
@ -42,6 +42,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private final boolean dropSlotOnClose;
private final Configuration originalConfig;
private final Integer statusUpdateIntervalMillis;
private final MessageDecoder messageDecoder;
private long defaultStartingPos;
@ -59,7 +60,8 @@ private PostgresReplicationConnection(Configuration config,
String slotName,
String pluginName,
boolean dropSlotOnClose,
Integer statusUpdateIntervalMillis) {
Integer statusUpdateIntervalMillis,
MessageDecoder messageDecoder) {
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
this.originalConfig = config;
@ -67,6 +69,7 @@ private PostgresReplicationConnection(Configuration config,
this.pluginName = pluginName;
this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
this.messageDecoder = messageDecoder;
try {
initReplicationSlot();
@ -157,6 +160,8 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) t
.logical()
.withSlotName(slotName)
.withStartPosition(lsn);
streamBuilder = messageDecoder.options(streamBuilder);
if (statusUpdateIntervalMillis != null && statusUpdateIntervalMillis > 0) {
streamBuilder.withStatusInterval(statusUpdateIntervalMillis, TimeUnit.MILLISECONDS);
}
@ -171,40 +176,29 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) t
private volatile LogSequenceNumber lastReceivedLSN;
@Override
public PgProto.RowMessage read() throws SQLException {
processWarnings(false);
public List<ReplicationMessage> read() throws SQLException {
ByteBuffer read = stream.read();
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (lsnLong >= stream.getLastReceiveLSN().asLong()) {
return null;
return Collections.emptyList();
}
return deserializeMessage(read);
return deserializeMessages(read);
}
@Override
public PgProto.RowMessage readPending() throws SQLException {
processWarnings(false);
public List<ReplicationMessage> readPending() throws SQLException {
ByteBuffer read = stream.readPending();
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) {
return null;
return Collections.emptyList();
}
return deserializeMessage(read);
return deserializeMessages(read);
}
private PgProto.RowMessage deserializeMessage(ByteBuffer buffer) {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException(
"Invalid buffer received from PG server during streaming replication");
}
byte[] source = buffer.array();
byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
lastReceivedLSN = stream.getLastReceiveLSN();
return PgProto.RowMessage.parseFrom(content);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
private List<ReplicationMessage> deserializeMessages(ByteBuffer buffer) {
final List<ReplicationMessage> msg = messageDecoder.deserializeMessage(buffer);
lastReceivedLSN = stream.getLastReceiveLSN();
return msg;
}
@Override
@ -268,7 +262,8 @@ protected static class ReplicationConnectionBuilder implements Builder {
private Configuration config;
private String slotName = DEFAULT_SLOT_NAME;
private String pluginName = DEFAULT_PLUGIN_NAME;
private String pluginName = PROTOBUF_PLUGIN_NAME;
private MessageDecoder messageDecoder = PostgresConnectorConfig.createDefaultMessageDecoder(PROTOBUF_PLUGIN_NAME);
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Integer statusUpdateIntervalMillis;
@ -303,9 +298,17 @@ public ReplicationConnectionBuilder statusUpdateIntervalMillis(final Integer sta
return this;
}
@Override
public Builder replicationMessageDecoder(final MessageDecoder messageDecoder) {
this.messageDecoder = messageDecoder;
return this;
}
@Override
public ReplicationConnection build() {
return new PostgresReplicationConnection(config, slotName, pluginName, dropSlotOnClose, statusUpdateIntervalMillis);
assert pluginName != null : "Decoding plugin name is not set";
assert messageDecoder != null : "Replication message decoder is not provided";
return new PostgresReplicationConnection(config, slotName, pluginName, dropSlotOnClose, statusUpdateIntervalMillis, messageDecoder);
}
}
}

View File

@ -88,7 +88,8 @@ interface Builder {
* Default replication settings
*/
String DEFAULT_SLOT_NAME = "debezium";
String DEFAULT_PLUGIN_NAME = "decoderbufs";
String PROTOBUF_PLUGIN_NAME = "decoderbufs";
String WAL2JSON_PLUGIN_NAME = "wal2json";
boolean DEFAULT_DROP_SLOT_ON_CLOSE = true;
/**
@ -105,7 +106,7 @@ interface Builder {
*
* @param pluginName the name of the slot, may not be null.
* @return this instance
* @see #DEFAULT_PLUGIN_NAME
* @see #PROTOBUF_PLUGIN_NAME
*/
Builder withPlugin(final String pluginName);
@ -126,6 +127,8 @@ interface Builder {
*/
Builder statusUpdateIntervalMillis(final Integer statusUpdateIntervalMillis);
Builder replicationMessageDecoder(final MessageDecoder messageDecoder);
/**
* Creates a new {@link ReplicationConnection} instance
* @return a connection, never null

View File

@ -0,0 +1,87 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection;
import java.util.List;
import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier;
/**
* An abstract represantion of replication message that is sent by PostgreSQL logical decoding plugin and
* is processed by Debezium PostgreSQL connector.
*
* @author Jiri Pechanec
*
*/
public interface ReplicationMessage {
/**
*
* Data modification operation executed
*
*/
public enum Operation {
INSERT, UPDATE, DELETE
}
/**
*
* A representation of column value delivered as a part of replication message
*
*/
public abstract class Column {
private final String name;
private final Object type;
public Column(final String name, final Object type) {
super();
this.name = name;
this.type = type;
}
public String getName() {
return name;
}
public Object getType() {
return type;
}
public abstract Object getValue(final PgConnectionSupplier connection);
}
/**
* @return A data operation executed
*/
public Operation getOperation();
/**
* @return Transaction commit time for this change
*/
public long getCommitTime();
/**
* @return An id of transaction to which this change belongs
*/
public int getTransactionId();
/**
* @return Table changed
*/
public String getTable();
/**
* @return Set of original values of table columns, null for INSERT
*/
public List<Column> getOldTupleList();
/**
* @return Set of new values of table columns, null for DELETE
*/
public List<Column> getNewTupleList();
}

View File

@ -7,9 +7,9 @@
package io.debezium.connector.postgresql.connection;
import java.sql.SQLException;
import org.postgresql.replication.PGReplicationStream;
import java.util.List;
import io.debezium.connector.postgresql.proto.PgProto;
import org.postgresql.replication.PGReplicationStream;
/**
* A stream from which messages sent by a logical decoding plugin can be consumed over a replication connection.
@ -27,7 +27,7 @@ public interface ReplicationStream extends AutoCloseable {
* @throws SQLException if anything unexpected fails
* @see PGReplicationStream#read()
*/
PgProto.RowMessage read() throws SQLException;
List<ReplicationMessage> read() throws SQLException;
/**
* Attempts to read a Protobuf message from a replication connection, returning that message if it's available or returning
@ -40,7 +40,7 @@ public interface ReplicationStream extends AutoCloseable {
* @throws SQLException if anything unexpected fails
* @see PGReplicationStream#readPending()
*/
PgProto.RowMessage readPending() throws SQLException;
List<ReplicationMessage> readPending() throws SQLException;
/**
* Sends a message to the server informing it about that latest position in the WAL that this stream has read via

View File

@ -0,0 +1,170 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.pgproto;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PgOid;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.relational.Column;
/**
* Logical encapsulation of column changes sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
*
* @author Jiri Pechanec
*
*/
class PgProtoColumn extends ReplicationMessage. Column {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final PgProto.DatumMessage raw;
public PgProtoColumn(final PgProto.DatumMessage raw) {
super(raw.getColumnName(), raw.getColumnType());
this.raw = raw;
}
/**
* Converts the Protobuf value for a {@link io.debezium.connector.postgresql.proto.PgProto.DatumMessage plugin message} to
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the
* {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects.
*
* Note that the logic here is tightly coupled (i.e. dependent) on the Postgres plugin logic which writes the actual
* Protobuf messages.
*
* @param a supplier to get a connection to Postgres instance for array handling
* @return the value; may be null
*/
@Override
public Object getValue(final PgConnectionSupplier connection) {
final PgProto.DatumMessage datumMessage = raw;
int columnType = (int) datumMessage.getColumnType();
switch (columnType) {
case PgOid.BOOL:
return datumMessage.hasDatumBool() ? datumMessage.getDatumBool() : null;
case PgOid.INT2:
case PgOid.INT4:
return datumMessage.hasDatumInt32() ? datumMessage.getDatumInt32() : null;
case PgOid.INT8:
case PgOid.OID:
case PgOid.MONEY:
return datumMessage.hasDatumInt64() ? datumMessage.getDatumInt64() : null;
case PgOid.FLOAT4:
return datumMessage.hasDatumFloat()? datumMessage.getDatumFloat() : null;
case PgOid.FLOAT8:
case PgOid.NUMERIC:
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;
case PgOid.CHAR:
case PgOid.VARCHAR:
case PgOid.BPCHAR:
case PgOid.TEXT:
case PgOid.JSON:
case PgOid.JSONB_OID:
case PgOid.XML:
case PgOid.UUID:
case PgOid.BIT:
case PgOid.VARBIT:
return datumMessage.hasDatumString() ? datumMessage.getDatumString() : null;
case PgOid.DATE:
return datumMessage.hasDatumInt32() ? (long) datumMessage.getDatumInt32() : null;
case PgOid.TIMESTAMP:
case PgOid.TIMESTAMPTZ:
case PgOid.TIME:
if (!datumMessage.hasDatumInt64()) {
return null;
}
// these types are sent by the plugin as LONG - microseconds since Unix Epoch
// but we'll convert them to nanos which is the smallest unit
return TimeUnit.NANOSECONDS.convert(datumMessage.getDatumInt64(), TimeUnit.MICROSECONDS);
case PgOid.TIMETZ:
if (!datumMessage.hasDatumDouble()) {
return null;
}
// the value is sent as a double microseconds, convert to nano
return BigDecimal.valueOf(datumMessage.getDatumDouble() * 1000).longValue();
case PgOid.INTERVAL:
// these are sent as doubles by the plugin since their storage is larger than 8 bytes
return datumMessage.hasDatumDouble() ? datumMessage.getDatumDouble() : null;
// the plugin will send back a TZ formatted string
case PgOid.BYTEA:
return datumMessage.hasDatumBytes() ? datumMessage.getDatumBytes().toByteArray() : null;
case PgOid.POINT: {
PgProto.Point datumPoint = datumMessage.getDatumPoint();
return new PGpoint(datumPoint.getX(), datumPoint.getY());
}
case PgOid.TSTZRANGE_OID:
return datumMessage.hasDatumBytes() ? new String(datumMessage.getDatumBytes().toByteArray(), Charset.forName("UTF-8")) : null;
case PgOid.INT2_ARRAY:
case PgOid.INT4_ARRAY:
case PgOid.INT8_ARRAY:
case PgOid.TEXT_ARRAY:
case PgOid.NUMERIC_ARRAY:
case PgOid.FLOAT4_ARRAY:
case PgOid.FLOAT8_ARRAY:
case PgOid.BOOL_ARRAY:
case PgOid.DATE_ARRAY:
case PgOid.TIME_ARRAY:
case PgOid.TIMETZ_ARRAY:
case PgOid.TIMESTAMP_ARRAY:
case PgOid.TIMESTAMPTZ_ARRAY:
case PgOid.BYTEA_ARRAY:
case PgOid.VARCHAR_ARRAY:
case PgOid.OID_ARRAY:
case PgOid.BPCHAR_ARRAY:
case PgOid.MONEY_ARRAY:
case PgOid.NAME_ARRAY:
case PgOid.INTERVAL_ARRAY:
case PgOid.CHAR_ARRAY:
case PgOid.VARBIT_ARRAY:
case PgOid.UUID_ARRAY:
case PgOid.XML_ARRAY:
case PgOid.POINT_ARRAY:
case PgOid.JSONB_ARRAY:
case PgOid.JSON_ARRAY:
case PgOid.REF_CURSOR_ARRAY:
// Currently the logical decoding plugin sends unhandled types as a byte array containing the string
// representation (in Postgres) of the array value.
// The approach to decode this is sub-optimal but the only way to improve this is to update the plugin.
// Reasons for it being sub-optimal include:
// 1. It requires a Postgres JDBC connection to deserialize
// 2. The byte-array is a serialised string but we make the assumption its UTF-8 encoded (which it will
// be in most cases)
// 3. For larger arrays and especially 64-bit integers and the like it is less efficient sending string
// representations over the wire.
try {
byte[] data = datumMessage.hasDatumBytes()? datumMessage.getDatumBytes().toByteArray() : null;
if (data == null) return null;
String dataString = new String(data, Charset.forName("UTF-8"));
PgArray arrayData = new PgArray(connection.get(), columnType, dataString);
Object deserializedArray = arrayData.getArray();
return Arrays.asList((Object[])deserializedArray);
}
catch (SQLException e) {
logger.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
}
return null;
default: {
logger.warn("processing column '{}' with unknown data type '{}' as byte array", datumMessage.getColumnName(),
datumMessage.getColumnType());
return datumMessage.hasDatumBytes()? datumMessage.getDatumBytes().toByteArray() : null;
}
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.pgproto;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.proto.PgProto;
/**
* ProtoBuf deserialization of message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>.
* The message is encapsulated into a List as ProtBuf plugin sends only one message.
*
* @author Jiri Pechanec
*
*/
public class PgProtoMessageDecoder implements MessageDecoder {
public PgProtoMessageDecoder() {
super();
}
@Override
public List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer) {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException(
"Invalid buffer received from PG server during streaming replication");
}
byte[] source = buffer.array();
byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
return Collections.singletonList(new PgProtoReplicationMessage(PgProto.RowMessage.parseFrom(content)));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Override
public ChainedLogicalStreamBuilder options(ChainedLogicalStreamBuilder builder) {
return builder;
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.pgproto;
import java.util.List;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.proto.PgProto;
/**
* Replication message representing message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
*
* @author Jiri Pechanec
*
*/
class PgProtoReplicationMessage implements ReplicationMessage {
private final PgProto.RowMessage rawMessage;
public PgProtoReplicationMessage(final PgProto.RowMessage rawMessage) {
this.rawMessage = rawMessage;
}
@Override
public Operation getOperation() {
switch (rawMessage.getOp()) {
case INSERT:
return Operation.INSERT;
case UPDATE:
return Operation.UPDATE;
case DELETE:
return Operation.DELETE;
}
throw new IllegalArgumentException(
"Unknown operation '" + rawMessage.getOp() + "' in replication stream message");
}
@Override
public long getCommitTime() {
return rawMessage.getCommitTime();
}
@Override
public int getTransactionId() {
return rawMessage.getTransactionId();
}
@Override
public String getTable() {
return rawMessage.getTable();
}
@Override
public List<ReplicationMessage.Column> getOldTupleList() {
return transform(rawMessage.getOldTupleList());
}
@Override
public List<ReplicationMessage.Column> getNewTupleList() {
return transform(rawMessage.getNewTupleList());
}
private List<ReplicationMessage.Column> transform(List<PgProto.DatumMessage> messageList) {
return messageList.stream()
.map(PgProtoColumn::new)
.collect(Collectors.toList());
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.wal2json;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.util.function.Supplier;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.time.NanoTimestamp;
/**
* Transformer for time/date related string representation in JSON message coming from plugin.
*
* @author Jiri Pechanec
*
*/
public interface DateTimeFormat {
public long timestamp(final String s);
public long timestampWithTimeZone(final String s);
public long systemTimestamp(final String s);
public LocalDate date(final String s);
public LocalTime time(final String s);
public OffsetTime timeWithTimeZone(final String s);
public static DateTimeFormat get() {
return new ISODateTimeFormat();
}
public static class ISODateTimeFormat implements DateTimeFormat {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final String TS_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static final DateTimeFormatter TS_FORMAT = DateTimeFormatter.ofPattern(TS_FORMAT_PATTERN);
private static final String TS_TZ_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ssX";
private static final DateTimeFormatter TS_TZ_FORMAT = DateTimeFormatter.ofPattern(TS_TZ_FORMAT_PATTERN);
private static final String SYSTEM_TS_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSSX";
private static final DateTimeFormatter SYSTEM_TS_FORMAT = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.appendOffset("+HH", "Z")
.toFormatter();
private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd";
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern(DATE_FORMAT_PATTERN);
private static final String TIME_FORMAT_PATTERN = "HH:mm:ss";
private static final DateTimeFormatter TIME_FORMAT = DateTimeFormatter.ofPattern(TIME_FORMAT_PATTERN);
private static final String TIME_TZ_FORMAT_PATTERN = "HH:mm:ssX";
private static final DateTimeFormatter TIME_TZ_FORMAT = DateTimeFormatter.ofPattern(TIME_TZ_FORMAT_PATTERN);
@Override
public long timestamp(final String s) {
return format(TS_FORMAT_PATTERN, s, () -> NanoTimestamp.toEpochNanos(LocalDateTime.parse(s, TS_FORMAT), null));
}
@Override
public long timestampWithTimeZone(final String s) {
return formatTZ(TS_TZ_FORMAT_PATTERN, TS_TZ_FORMAT, s);
}
@Override
public LocalDate date(final String s) {
return format(DATE_FORMAT_PATTERN, s, () -> LocalDate.parse(s, DATE_FORMAT));
}
@Override
public LocalTime time(final String s) {
return format(TIME_FORMAT_PATTERN, s, () -> LocalTime.parse(s, TIME_FORMAT));
}
@Override
public OffsetTime timeWithTimeZone(final String s) {
return format(TIME_TZ_FORMAT_PATTERN, s, () -> OffsetTime.parse(s, TIME_TZ_FORMAT)).withOffsetSameInstant(ZoneOffset.UTC);
}
@Override
public long systemTimestamp(final String s) {
return formatTZ(SYSTEM_TS_FORMAT_PATTERN, SYSTEM_TS_FORMAT, s);
}
private long formatTZ(final String pattern, final DateTimeFormatter formatter, final String s) {
return format(pattern, s, () -> {
final Instant ts = Instant.from(formatter.parse(s));
return ts.getEpochSecond() * 1_000_000_000 + ts.getNano();
});
}
private <T> T format(final String pattern, final String s, final Supplier<T> value) {
try {
return value.get();
} catch (final DateTimeParseException e) {
logger.error("Cannot parse time/date value '{}', expected format '{}'", s, pattern);
throw new ConnectException(e);
}
}
}
}

View File

@ -0,0 +1,163 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.wal2json;
import java.sql.SQLException;
import java.util.Arrays;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.Oid;
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray;
import org.postgresql.util.PGInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.document.Value;
import io.debezium.relational.Column;
/**
* Logical encapsulation of column changes sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
*
* @author Jiri Pechanec
*
*/
class WAL2JSONColumn extends ReplicationMessage.Column {
private static final Logger logger = LoggerFactory.getLogger(WAL2JSONColumn.class);
private final Value rawValue;
public WAL2JSONColumn(final String name, final String type, final Value value) {
super(name, type);
this.rawValue = value;
}
/**
* Converts the value (string representation) coming from wal2json plugin to
* a Java value based on the type of the column from the message. This value will be converted later on if necessary by the
* {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects.
*
* Note that the logic here is tightly coupled (i.e. dependent) on the wal2json plugin logic which writes the actual
* Protobuf messages.
*
* @param a supplier to get a connection to Postgres instance for array handling
* @return the value; may be null
*/
@Override
public Object getValue(final PgConnectionSupplier connection) {
String columnType = (String)getType();
switch (columnType) {
case "bool":
return rawValue.isNotNull() ? rawValue.asBoolean() : null;
case "int2":
case "int4":
return rawValue.isNotNull() ? rawValue.asInteger() : null;
case "int8":
case "oid":
return rawValue.isNotNull() ? rawValue.asLong() : null;
case "float4":
return rawValue.isNotNull() ? rawValue.asFloat() : null;
case "float8":
return rawValue.isNotNull() ? rawValue.asDouble() : null;
case "numeric":
return rawValue.isNotNull() ? rawValue.asDouble() : null;
case "char":
case "varchar":
case "bpchar":
case "text":
case "json":
case "jsonb":
case "xml":
case "uuid":
case "bit":
case "varbit":
case "tstzrange":
return rawValue.isNotNull() ? rawValue.asString() : null;
case "date":
return rawValue.isNotNull() ? DateTimeFormat.get().date(rawValue.asString()) : null;
case "timestamp":
return rawValue.isNotNull() ? DateTimeFormat.get().timestamp(rawValue.asString()) : null;
case "timestamptz":
return rawValue.isNotNull() ? DateTimeFormat.get().timestampWithTimeZone(rawValue.asString()) : null;
case "time":
return rawValue.isNotNull() ? DateTimeFormat.get().time(rawValue.asString()) : null;
case "timetz":
return rawValue.isNotNull() ? DateTimeFormat.get().timeWithTimeZone(rawValue.asString()) : null;
case "bytea":
if (rawValue.isNull()) {
return null;
}
final String hex = rawValue.asString();
final byte[] bytes = new byte[hex.length() / 2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte)Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
}
return bytes;
case "point":
try {
return rawValue.isNotNull() ? new PGpoint(rawValue.asString()) : null;
} catch (final SQLException e) {
logger.error("Failed to parse point {}, {}", rawValue.asString(), e);
throw new ConnectException(e);
}
case "money":
return rawValue.isNotNull() ? Double.parseDouble(rawValue.asString().replace("$", "").replace(",", "")) : null;
case "interval":
try {
return rawValue.isNotNull() ? new PGInterval(rawValue.asString()) : null;
} catch (final SQLException e) {
logger.error("Failed to parse point {}, {}", rawValue.asString(), e);
throw new ConnectException(e);
}
case "_int2":
case "_int4":
case "_int8":
case "_text":
case "_numeric":
case "_float4":
case "_float8":
case "_bool":
case "_date":
case "_time":
case "_timetz":
case "_timestamp":
case "_timestamptz":
case "_bytea":
case "_varchar":
case "_oid":
case "_bpchar":
case "_money":
case "_name":
case "_interval":
case "_char":
case "_varbit":
case "_uuid":
case "_xml":
case "_point":
case "_jsonb":
case "_json":
case "_ref_cursor":
try {
final String dataString = rawValue.asString();
PgArray arrayData = new PgArray(connection.get(), Oid.valueOf(columnType.substring(1) + "_array"), dataString);
Object deserializedArray = arrayData.getArray();
return Arrays.asList((Object[])deserializedArray);
}
catch (SQLException e) {
logger.warn("Unexpected exception trying to process PgArray column '{}'", getName(), e);
}
return null;
}
logger.warn("processing column '{}' with unknown data type '{}' as byte array", getName(),
getType());
return rawValue.asBytes();
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.wal2json;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
/**
* JSON deserialization of a message sent by
* <a href="https://github.com/eulerto/wal2json">wal2json</a> logical decoding plugin. The plugin sends all
* changes in one transaction as a single batch.
*
* @author Jiri Pechanec
*
*/
public class WAL2JSONMessageDecoder implements MessageDecoder {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final DateTimeFormat dateTime = DateTimeFormat.get();
public WAL2JSONMessageDecoder() {
super();
}
@Override
public List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer) {
try {
if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
}
byte[] source = buffer.array();
byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
final Document message = DocumentReader.defaultReader().read(content);
logger.debug("Message arrived for decoding {}", message);
final int txId = message.getInteger("xid");
final String timestamp = message.getString("timestamp");
final long commitTime = dateTime.systemTimestamp(timestamp);
final Array changes = message.getArray("change");
return changes.streamValues()
.map(x -> new WAL2JSONReplicationMessage(txId, commitTime, x.asDocument()))
.collect(Collectors.toList());
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
@Override
public ChainedLogicalStreamBuilder options(ChainedLogicalStreamBuilder builder) {
return builder
.withSlotOption("pretty-print", 1)
.withSlotOption("write-in-chunks", 0)
.withSlotOption("include-xids", 1)
.withSlotOption("include-timestamp", 1);
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.connection.wal2json;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.document.Array;
import io.debezium.document.Document;
/**
* Replication message representing message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
*
* @author Jiri Pechanec
*
*/
class WAL2JSONReplicationMessage implements ReplicationMessage {
private final int txId;
private final long commitTime;
private final Document rawMessage;
public WAL2JSONReplicationMessage(final int txId, final long commitTime, final Document rawMessage) {
this.txId = txId;
this.commitTime = commitTime;
this.rawMessage = rawMessage;
}
@Override
public Operation getOperation() {
final String operation = rawMessage.getString("kind");
switch (operation) {
case "insert":
return Operation.INSERT;
case "update":
return Operation.UPDATE;
case "delete":
return Operation.DELETE;
}
throw new IllegalArgumentException(
"Unknown operation '" + operation + "' in replication stream message");
}
@Override
public long getCommitTime() {
return commitTime;
}
@Override
public int getTransactionId() {
return txId;
}
@Override
public String getTable() {
return String.format("\"%s\".\"%s\"", rawMessage.getString("schema"), rawMessage.getString("table"));
}
@Override
public List<ReplicationMessage.Column> getOldTupleList() {
final Document oldkeys = rawMessage.getDocument("oldkeys");
return oldkeys != null ? transform(oldkeys, "keynames", "keytypes", "keyvalues") : null;
}
@Override
public List<ReplicationMessage.Column> getNewTupleList() {
return transform(rawMessage, "columnnames", "columntypes", "columnvalues");
}
private List<ReplicationMessage.Column> transform(final Document data, final String nameField, final String typeField, final String valueField) {
final Array columnNames = data.getArray(nameField);
final Array columnTypes = data.getArray(typeField);
final Array columnValues = data.getArray(valueField);
if (columnNames.size() != columnTypes.size() || columnNames.size() != columnValues.size()) {
throw new ConnectException("Column related arrays do not have the same size");
}
final List<ReplicationMessage.Column> columns = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
columns.add(new WAL2JSONColumn(columnNames.get(i).asString(), columnTypes.get(i).asString(), columnValues.get(i)));
}
return columns;
}
}

View File

@ -30,6 +30,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -293,6 +294,7 @@ protected static class SchemaAndValueField {
private final Object schema;
private final Object value;
private final String fieldName;
private Supplier<Boolean> assertValueOnlyIf = null;
public SchemaAndValueField(String fieldName, Object schema, Object value) {
this.schema = schema;
@ -300,12 +302,21 @@ public SchemaAndValueField(String fieldName, Object schema, Object value) {
this.fieldName = fieldName;
}
public SchemaAndValueField assertValueOnlyIf(final Supplier<Boolean> predicate) {
assertValueOnlyIf = predicate;
return this;
}
protected void assertFor(Struct content) {
assertSchema(content);
assertValue(content);
}
private void assertValue(Struct content) {
if (assertValueOnlyIf != null && !assertValueOnlyIf.get()) {
return;
}
if (value == null) {
assertNull(fieldName + " is present in the actual content", content.get(fieldName));
return;

View File

@ -0,0 +1,25 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import java.util.function.Supplier;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
public class DecoderDifferences {
public static int updatesWithoutPK(final int expectedCount, final int updatesWithoutPK) {
return !ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME.equals(TestHelper.decoderPluginName()) ? expectedCount : expectedCount - updatesWithoutPK;
}
public static class AreQuotedIdentifiersUnsupported implements Supplier<Boolean> {
@Override
public Boolean get() {
return ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME.equals(TestHelper.decoderPluginName());
}
}
}

View File

@ -122,7 +122,8 @@ public void shouldValidateConfiguration() throws Exception {
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1);
// validate the non required fields
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, ReplicationConnection.Builder.DEFAULT_PLUGIN_NAME);
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME);
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_DECODING_CLASS, null);
validateField(validatedConfig, PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
validateField(validatedConfig, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
validateField(validatedConfig, PostgresConnectorConfig.PORT, PostgresConnectorConfig.DEFAULT_PORT);
@ -157,14 +158,22 @@ public void shouldSupportSSLParameters() throws Exception {
Configuration config = TestHelper.defaultConfig().with(PostgresConnectorConfig.SSL_MODE,
PostgresConnectorConfig.SecureConnectionMode.REQUIRED).build();
start(PostgresConnector.class, config, (success, msg, error) -> {
// we expect the task to fail at startup when we're printing the server info
assertThat(success).isFalse();
assertThat(error).isInstanceOf(ConnectException.class);
Throwable cause = error.getCause();
assertThat(cause).isInstanceOf(SQLException.class);
assertThat(PSQLState.CONNECTION_REJECTED).isEqualTo(new PSQLState(((SQLException)cause).getSQLState()));
if (TestHelper.shouldSSLConnectionFail()) {
// we expect the task to fail at startup when we're printing the server info
assertThat(success).isFalse();
assertThat(error).isInstanceOf(ConnectException.class);
Throwable cause = error.getCause();
assertThat(cause).isInstanceOf(SQLException.class);
assertThat(PSQLState.CONNECTION_REJECTED).isEqualTo(new PSQLState(((SQLException)cause).getSQLState()));
}
});
assertConnectorNotRunning();
if (TestHelper.shouldSSLConnectionFail()) {
assertConnectorNotRunning();
} else {
assertConnectorIsRunning();
Thread.sleep(10000);
stopConnector();
}
}
@Test

View File

@ -64,7 +64,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
//then start the producer and validate all records are there
snapshotProducer.start(consumer);
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
Map<String, List<SchemaAndValueField>> expectedValuesByTableName = super.schemaAndValuesByTableName();
consumer.process(record -> assertReadRecord(record, expectedValuesByTableName));
@ -93,13 +93,13 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
snapshotProducer.start(consumer);
// first make sure we get the initial records from both schemas...
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
consumer.clear();
// then insert some more data and check that we get it back
TestHelper.execute(insertStmt);
consumer.expects(2);
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
SourceRecord first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 2);
@ -120,7 +120,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
consumer = testConsumer(expectedRecordsCount, "s1", "s2");
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER), true);
snapshotProducer.start(consumer);
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
AtomicInteger counter = new AtomicInteger(0);
consumer.process(record -> {
@ -135,7 +135,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
TestHelper.execute(insertStmt);
consumer.expects(2);
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
first = consumer.remove();
VerifyRecord.isValidInsert(first, PK_FIELD, 4);
assertRecordOffset(first, false, false);

View File

@ -20,10 +20,14 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.relational.TableId;
/**
@ -37,6 +41,9 @@ public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
private RecordsStreamProducer recordsProducer;
private TestConsumer consumer;
@Rule
public TestRule conditionalFail = new ConditionalFail();
@Before
public void before() throws Exception {
TestHelper.dropAllSchemas();
@ -102,6 +109,7 @@ public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Except
}
@Test
@ShouldFailWhen(DecoderDifferences.AreQuotedIdentifiersUnsupported.class)
public void shouldReceiveChangesForInsertsWithQuotedNames() throws Exception {
TestHelper.executeDDL("postgres_create_tables.ddl");
@ -348,6 +356,6 @@ private SourceRecord assertRecordInserted(String expectedTopicName, String pkCol
private void executeAndWait(String statements) throws Exception {
TestHelper.execute(statements);
consumer.await(2, TimeUnit.SECONDS);
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
}
}

View File

@ -17,6 +17,7 @@
import java.util.stream.Collectors;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SecureConnectionMode;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.jdbc.JdbcConfiguration;
@ -44,12 +45,23 @@ private TestHelper() {
* @throws SQLException if there is a problem obtaining a replication connection
*/
public static ReplicationConnection createForReplication(String slotName, boolean dropOnClose) throws SQLException {
String pluginName = decoderPluginName();
return ReplicationConnection.builder(defaultJdbcConfig())
.withPlugin(pluginName)
.replicationMessageDecoder(PostgresConnectorConfig.createDefaultMessageDecoder(pluginName))
.withSlot(slotName)
.dropSlotOnClose(dropOnClose)
.build();
}
/**
* @return
*/
static String decoderPluginName() {
final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name());
return (s == null || s.length() == 0) ? ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME : s;
}
/**
* Obtain a default DB connection.
*
@ -117,9 +129,16 @@ protected static Configuration.Builder defaultConfig() {
JdbcConfiguration jdbcConfiguration = defaultJdbcConfig();
Configuration.Builder builder = Configuration.create();
jdbcConfiguration.forEach((field, value) -> builder.with(PostgresConnectorConfig.DATABASE_CONFIG_PREFIX + field, value));
return builder.with(PostgresConnectorConfig.SERVER_NAME, TEST_SERVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100);
builder.with(PostgresConnectorConfig.SERVER_NAME, TEST_SERVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100)
.with(PostgresConnectorConfig.PLUGIN_NAME, decoderPluginName())
.with(PostgresConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED);
final String testNetworkTimeout = System.getProperty("test.network.timeout");
if (testNetworkTimeout != null && testNetworkTimeout.length() != 0) {
builder.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, Integer.parseInt(testNetworkTimeout));
}
return builder;
}
protected static void executeDDL(String ddlFile) throws Exception {
@ -136,4 +155,12 @@ protected static void executeDDL(String ddlFile) throws Exception {
protected static String topicName(String suffix) {
return TestHelper.TEST_SERVER + "." + suffix;
}
protected static boolean shouldSSLConnectionFail() {
return Boolean.parseBoolean(System.getProperty("test.ssl.failonconnect", "true"));
}
protected static int waitTimeForRecords() {
return Integer.parseInt(System.getProperty("test.records.waittime", "2"));
}
}

View File

@ -23,8 +23,8 @@
import org.junit.Before;
import org.junit.Test;
import io.debezium.connector.postgresql.DecoderDifferences;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.connector.postgresql.proto.PgProto;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
@ -77,7 +77,7 @@ public void shouldReceiveAndDecodeIndividualChanges() throws Exception {
// create a replication connection which should be dropped once it's closed
try (ReplicationConnection connection = TestHelper.createForReplication("test", true)) {
ReplicationStream stream = connection.startStreaming(); // this creates the replication slot
int expectedMessages = insertLargeTestData();
int expectedMessages = DecoderDifferences.updatesWithoutPK(insertLargeTestData(), 1);
expectedMessagesFromStream(stream, expectedMessages);
}
}
@ -237,18 +237,18 @@ private int startInsertStop(String slotName, Consumer<ReplicationStream> streamP
return expectedMessageCount;
}
private List<PgProto.RowMessage> expectedMessagesFromStream(ReplicationStream stream,
private List<ReplicationMessage> expectedMessagesFromStream(ReplicationStream stream,
int expectedMessages) throws Exception {
List<PgProto.RowMessage> actualMessages = new ArrayList<>();
List<ReplicationMessage> actualMessages = new ArrayList<>();
ExecutorService executorService = Executors.newSingleThreadExecutor();
CountDownLatch latch = new CountDownLatch(expectedMessages);
Metronome metronome = Metronome.sleeper(50, TimeUnit.MILLISECONDS, Clock.SYSTEM);
Future<?> result = executorService.submit(() -> {
PgProto.RowMessage message;
List<ReplicationMessage> message;
while (!Thread.interrupted()) {
while ((message = stream.readPending()) != null) {
actualMessages.add(message);
actualMessages.addAll(message);
latch.countDown();
}
metronome.pause();

View File

@ -210,6 +210,7 @@ public Float asFloat() {
public Double asDouble() {
if (value instanceof Double) return (Double) value;
if (value instanceof Float) return new Double(((Float) value).doubleValue());
if (value instanceof Number) return ((Number)value).doubleValue();
return null;
}

View File

@ -233,6 +233,10 @@ protected Function<Object[], Struct> createValueGenerator(Schema schema, TableId
Column col = columns.get(i);
LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:",
tableId, col.name(), col.typeName(), row, e);
} catch (final Exception e) {
Column col = columns.get(i);
LOGGER.error("Failed to properly convert data value for '{}.{}' of type {} for row {}:",
tableId, col.name(), col.typeName(), row, e);
}
} else if (traceMessage.getAndSet(false)) {
Column col = columns.get(i);

View File

@ -0,0 +1,49 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.lang.annotation.Annotation;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* A base {@link TestRule} that allows easy writing of test rules based on method annotations.
* @author Jiri Pechanec
*
*/
public abstract class AnnotationBasedTestRule implements TestRule {
protected static Statement emptyStatement(final String reason, final Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
StringBuilder messageBuilder = new StringBuilder(description.testCount());
messageBuilder.append("Skipped ").append(description.toString());
if (reason != null && !reason.trim().isEmpty()) {
messageBuilder.append(" because: ").append(reason);
}
System.out.println(messageBuilder.toString());
}
};
}
protected <T extends Annotation> T hasAnnotation(Description description, Class<T> annotationClass) {
T annotation = description.getAnnotation(annotationClass);
if (annotation != null) {
return annotation;
} else if (description.isTest() && description.getTestClass().isAnnotationPresent(annotationClass)) {
return description.getTestClass().getAnnotation(annotationClass);
}
return null;
}
public AnnotationBasedTestRule() {
super();
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
/**
* JUnit rule that inspects the presence of the {@link ShouldFailWhen} annotation on a test method.
* If it finds the annotation, it will modify pass/fail report of test depending on the condition
* defined in the annotation.
*
* @author Jiri Pechanec
*/
public class ConditionalFail extends AnnotationBasedTestRule {
@Override
public Statement apply(final Statement base, final Description description) {
final ShouldFailWhen conditionClass = hasAnnotation(description, ShouldFailWhen.class);
if (conditionClass == null) {
return base;
}
try {
Supplier<Boolean> condition = conditionClass.value().newInstance();
return new Statement() {
@Override
public void evaluate() throws Throwable {
Throwable failure = null;
try {
base.evaluate();
}
catch (final Throwable t) {
failure = t;
}
if (condition.get() && failure == null) {
Assert.fail("Expected failing test for " + description);
} else if (condition.get() && failure != null) {
System.out.println("Ignored failure for " + description);
} else if (failure != null) {
throw failure;
}
}
};
} catch (final InstantiationException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.junit;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.function.Supplier;
/**
* Marker annotation that defines if a test failure should be reported or not base upon condition passed to the annotation.
* If the condition evaluates to <code>true</code> and a test fails then the failure is ignored. If the test does not fail
* then a failure is reported.
* If the condition evaluates to <code>false</code> then the test failure is handled in a usual way.
*
* @author Jiri Pechanec
*
*/
@Retention(RUNTIME)
@Target(METHOD)
public @interface ShouldFailWhen {
Class<? extends Supplier<Boolean>> value();
}

View File

@ -5,8 +5,6 @@
*/
package io.debezium.junit;
import java.lang.annotation.Annotation;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@ -17,7 +15,7 @@
*
* @author Horia Chiorean
*/
public class SkipTestRule implements TestRule {
public class SkipTestRule extends AnnotationBasedTestRule {
@Override
public Statement apply( Statement base,
@ -45,28 +43,4 @@ public Statement apply( Statement base,
return base;
}
private <T extends Annotation> T hasAnnotation( Description description, Class<T> annotationClass ) {
T annotation = description.getAnnotation(annotationClass);
if (annotation != null) {
return annotation;
} else if (description.isTest() && description.getTestClass().isAnnotationPresent(annotationClass)) {
return description.getTestClass().getAnnotation(annotationClass);
}
return null;
}
private static Statement emptyStatement( final String reason, final Description description ) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
StringBuilder messageBuilder = new StringBuilder(description.testCount());
messageBuilder.append("Skipped ").append(description.toString());
if (reason != null && !reason.trim().isEmpty()) {
messageBuilder.append(" because: ").append(reason);
}
System.out.println(messageBuilder.toString());
}
};
}
}

View File

@ -371,6 +371,7 @@ protected int consumeRecords(int numberOfRecords, Consumer<SourceRecord> recordC
print(record);
}
} else {
// TODO Add support for cases when records are returned in more than one batch
return recordsConsumed;
}
}