DBZ-258 Changes after first review

This commit is contained in:
Jiri Pechanec 2017-09-27 07:44:35 +02:00 committed by Gunnar Morling
parent 0bc8129961
commit e47b4cb81c
18 changed files with 183 additions and 154 deletions

View File

@ -11,13 +11,12 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.common.config.ConfigValue; import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
@ -25,8 +24,9 @@
import io.debezium.connector.postgresql.connection.MessageDecoder; import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder; import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
import io.debezium.connector.postgresql.connection.wal2json.WAL2JSONMessageDecoder; import io.debezium.connector.postgresql.connection.wal2json.Wal2JsonMessageDecoder;
import io.debezium.jdbc.JdbcConfiguration; import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
/** /**
* The configuration properties for the {@link PostgresConnector} * The configuration properties for the {@link PostgresConnector}
@ -339,6 +339,36 @@ public static TopicSelectionStrategy parse(String value) {
} }
} }
public enum LogicalDecoder implements EnumeratedValue {
DECODERBUFS("decoderbufs", PgProtoMessageDecoder.class),
WAL2JSON("wal2json", Wal2JsonMessageDecoder.class);
private final String decoderName;
private final Class<? extends MessageDecoder> messageDecoder;
LogicalDecoder(String decoderName, Class<? extends MessageDecoder> messageDecoder) {
this.decoderName = decoderName;
this.messageDecoder = messageDecoder;
}
public MessageDecoder messageDecoder() {
try {
return messageDecoder.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ConnectException("Cannot instantiate decoding class '" + messageDecoder + "' for decoder plugin '" + getValue() + "'");
}
}
public static LogicalDecoder parse(String s) {
return valueOf(s.trim().toUpperCase());
}
@Override
public String getValue() {
return decoderName;
}
}
protected static final String DATABASE_CONFIG_PREFIX = "database."; protected static final String DATABASE_CONFIG_PREFIX = "database.";
protected static final int DEFAULT_PORT = 5432; protected static final int DEFAULT_PORT = 5432;
protected static final int DEFAULT_MAX_BATCH_SIZE = 10240; protected static final int DEFAULT_MAX_BATCH_SIZE = 10240;
@ -351,11 +381,10 @@ public static TopicSelectionStrategy parse(String value) {
public static final Field PLUGIN_NAME = Field.create("plugin.name") public static final Field PLUGIN_NAME = Field.create("plugin.name")
.withDisplayName("Plugin") .withDisplayName("Plugin")
.withType(Type.STRING) .withEnum(LogicalDecoder.class, LogicalDecoder.DECODERBUFS)
.withWidth(Width.MEDIUM) .withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM) .withImportance(Importance.MEDIUM)
.withDefault(ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME) .withDescription("The name of the Postgres logical decoding plugin installed on the server. Defaults to '"+ LogicalDecoder.DECODERBUFS.getValue() + "'");
.withDescription("The name of the Postgres logical decoding plugin installed on the server. Defaults to '"+ ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME + "'");
public static final Field PLUGIN_DECODING_CLASS = Field.create("plugin.decoding.class") public static final Field PLUGIN_DECODING_CLASS = Field.create("plugin.decoding.class")
.withDisplayName("Plugin decoder class") .withDisplayName("Plugin decoder class")
@ -704,29 +733,8 @@ protected String databaseName() {
return config.getString(DATABASE_NAME); return config.getString(DATABASE_NAME);
} }
protected String pluginName() { protected LogicalDecoder plugin() {
return config.getString(PLUGIN_NAME); return LogicalDecoder.parse(config.getString(PLUGIN_NAME));
}
protected MessageDecoder pluginDecoder() {
final MessageDecoder decoder = config.getInstance(PLUGIN_DECODING_CLASS, MessageDecoder.class);
return decoder != null ? decoder : createDefaultMessageDecoder(pluginName());
}
/**
* Provides default replication message decoder instance for a given plugin name
*
* @param pluginName
* @return an instance of decoder
*/
public static MessageDecoder createDefaultMessageDecoder(final String pluginName) {
switch (pluginName) {
case ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME:
return new PgProtoMessageDecoder();
case ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME:
return new WAL2JSONMessageDecoder();
}
throw new ConnectException("Undefined default plugin decoding class for decoder plugin '" + pluginName + "'");
} }
protected String slotName() { protected String slotName() {

View File

@ -74,8 +74,7 @@ protected void refreshSchema(boolean printReplicaIdentityInfo) throws SQLExcepti
protected ReplicationConnection createReplicationConnection() throws SQLException { protected ReplicationConnection createReplicationConnection() throws SQLException {
return ReplicationConnection.builder(config.jdbcConfig()) return ReplicationConnection.builder(config.jdbcConfig())
.withSlot(config.slotName()) .withSlot(config.slotName())
.withPlugin(config.pluginName()) .withPlugin(config.plugin())
.replicationMessageDecoder(config.pluginDecoder())
.dropSlotOnClose(config.dropSlotOnStop()) .dropSlotOnClose(config.dropSlotOnStop())
.statusUpdateIntervalMillis(config.statusUpdateIntervalMillis()) .statusUpdateIntervalMillis(config.statusUpdateIntervalMillis())
.build(); .build();

View File

@ -37,7 +37,7 @@
/** /**
* A {@link RecordsProducer} which creates {@link org.apache.kafka.connect.source.SourceRecord records} from a Postgres * A {@link RecordsProducer} which creates {@link org.apache.kafka.connect.source.SourceRecord records} from a Postgres
* streaming replication connection and {@link io.debezium.connector.postgresql.proto.PgProto messages}. * streaming replication connection and {@link io.debezium.connector.postgresql.connection.ReplicationMessage messages}.
* *
* @author Horia Chiorean (hchiorea@redhat.com) * @author Horia Chiorean (hchiorea@redhat.com)
*/ */
@ -107,10 +107,7 @@ private void streamChanges(Consumer<SourceRecord> consumer) {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
try { try {
// this will block until a message is available // this will block until a message is available
List<ReplicationMessage> messages = stream.read(); stream.read(x -> process(x, stream.lastReceivedLSN(), consumer));
for (final ReplicationMessage message: messages) {
process(message, stream.lastReceivedLSN(), consumer);
}
} catch (SQLException e) { } catch (SQLException e) {
Throwable cause = e.getCause(); Throwable cause = e.getCause();
if (cause != null && (cause instanceof IOException)) { if (cause != null && (cause instanceof IOException)) {

View File

@ -7,13 +7,15 @@
package io.debezium.connector.postgresql.connection; package io.debezium.connector.postgresql.connection;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.sql.SQLException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
/** /**
* A class that is able to deserialize/decode binary representation of a batch of replication messages generated by * A class that is able to deserialize/decode binary representation of a batch of replication messages generated by
* logical decoding plugin. There is usually one implementation per decoding plugin. * logical decoding plugin. Clients provide a callback code for processing.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
* *
@ -21,12 +23,19 @@
public interface MessageDecoder { public interface MessageDecoder {
/** /**
* Deserializes binary representation of replication message into a logical Java class implementation * Process a message upon arrival from logical decoder
* *
* @param buffer - binary representation of replication message * @param buffer - binary representation of replication message
* @return a List of Java classes encapsulating decoded message * @param processor - message processing on arrival
*/ */
List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer); void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException;
/**
* Allows MessageDecoder to configure options with which the replication stream is started.
* See POstgreSQL command START_REPLICATION SLOT for more details.
*
* @param builder
* @return the builder instance
*/
ChainedLogicalStreamBuilder options(final ChainedLogicalStreamBuilder builder); ChainedLogicalStreamBuilder options(final ChainedLogicalStreamBuilder builder);
} }

View File

@ -10,8 +10,6 @@
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLWarning; import java.sql.SQLWarning;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -38,7 +36,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
private final String slotName; private final String slotName;
private final String pluginName; private final PostgresConnectorConfig.LogicalDecoder plugin;
private final boolean dropSlotOnClose; private final boolean dropSlotOnClose;
private final Configuration originalConfig; private final Configuration originalConfig;
private final Integer statusUpdateIntervalMillis; private final Integer statusUpdateIntervalMillis;
@ -51,25 +49,24 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* *
* @param config the JDBC configuration for the connection; may not be null * @param config the JDBC configuration for the connection; may not be null
* @param slotName the name of the DB slot for logical replication; may not be null * @param slotName the name of the DB slot for logical replication; may not be null
* @param pluginName the name of the server side plugin used for streaming changes; may not be null; * @param plugin the type of the server side plugin used for streaming changes; may not be null;
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed * @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateIntervalMillis the number of milli-seconds at which the replication connection should periodically send status * @param statusUpdateIntervalMillis the number of milli-seconds at which the replication connection should periodically send status
* updates to the server * updates to the server
*/ */
private PostgresReplicationConnection(Configuration config, private PostgresReplicationConnection(Configuration config,
String slotName, String slotName,
String pluginName, PostgresConnectorConfig.LogicalDecoder plugin,
boolean dropSlotOnClose, boolean dropSlotOnClose,
Integer statusUpdateIntervalMillis, Integer statusUpdateIntervalMillis) {
MessageDecoder messageDecoder) {
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings); super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
this.originalConfig = config; this.originalConfig = config;
this.slotName = slotName; this.slotName = slotName;
this.pluginName = pluginName; this.plugin = plugin;
this.dropSlotOnClose = dropSlotOnClose; this.dropSlotOnClose = dropSlotOnClose;
this.statusUpdateIntervalMillis = statusUpdateIntervalMillis; this.statusUpdateIntervalMillis = statusUpdateIntervalMillis;
this.messageDecoder = messageDecoder; this.messageDecoder = plugin.messageDecoder();
try { try {
initReplicationSlot(); initReplicationSlot();
@ -81,25 +78,25 @@ private PostgresReplicationConnection(Configuration config,
protected void initReplicationSlot() throws SQLException { protected void initReplicationSlot() throws SQLException {
ServerInfo.ReplicationSlot slotInfo; ServerInfo.ReplicationSlot slotInfo;
try (PostgresConnection connection = new PostgresConnection(originalConfig)) { try (PostgresConnection connection = new PostgresConnection(originalConfig)) {
slotInfo = connection.readReplicationSlotInfo(slotName, pluginName); slotInfo = connection.readReplicationSlotInfo(slotName, plugin.getValue());
} }
boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo; boolean shouldCreateSlot = ServerInfo.ReplicationSlot.INVALID == slotInfo;
try { try {
if (shouldCreateSlot) { if (shouldCreateSlot) {
LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, pluginName); LOGGER.debug("Creating new replication slot '{}' for plugin '{}'", slotName, plugin);
// there's no info for this plugin and slot so create a new slot // there's no info for this plugin and slot so create a new slot
pgConnection().getReplicationAPI() pgConnection().getReplicationAPI()
.createReplicationSlot() .createReplicationSlot()
.logical() .logical()
.withSlotName(slotName) .withSlotName(slotName)
.withOutputPlugin(pluginName) .withOutputPlugin(plugin.getValue())
.make(); .make();
} else if (slotInfo.active()) { } else if (slotInfo.active()) {
LOGGER.error( LOGGER.error(
"A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server." + "A logical replication slot named '{}' for plugin '{}' and database '{}' is already active on the server." +
"You cannot have multiple slots with the same name active for the same database", "You cannot have multiple slots with the same name active for the same database",
slotName, pluginName, database()); slotName, plugin.getValue(), database());
throw new IllegalStateException(); throw new IllegalStateException();
} }
@ -176,29 +173,28 @@ private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) t
private volatile LogSequenceNumber lastReceivedLSN; private volatile LogSequenceNumber lastReceivedLSN;
@Override @Override
public List<ReplicationMessage> read() throws SQLException { public void read(ReplicationMessageProcessor processor) throws SQLException {
ByteBuffer read = stream.read(); ByteBuffer read = stream.read();
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice // the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (lsnLong >= stream.getLastReceiveLSN().asLong()) { if (lsnLong >= stream.getLastReceiveLSN().asLong()) {
return Collections.emptyList(); return;
} }
return deserializeMessages(read); deserializeMessages(read, processor);
} }
@Override @Override
public List<ReplicationMessage> readPending() throws SQLException { public void readPending(ReplicationMessageProcessor processor) throws SQLException {
ByteBuffer read = stream.readPending(); ByteBuffer read = stream.readPending();
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice // the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) { if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) {
return Collections.emptyList(); return;
} }
return deserializeMessages(read); deserializeMessages(read, processor);
} }
private List<ReplicationMessage> deserializeMessages(ByteBuffer buffer) { private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException {
final List<ReplicationMessage> msg = messageDecoder.deserializeMessage(buffer);
lastReceivedLSN = stream.getLastReceiveLSN(); lastReceivedLSN = stream.getLastReceiveLSN();
return msg; messageDecoder.processMessage(buffer, processor);
} }
@Override @Override
@ -262,8 +258,7 @@ protected static class ReplicationConnectionBuilder implements Builder {
private Configuration config; private Configuration config;
private String slotName = DEFAULT_SLOT_NAME; private String slotName = DEFAULT_SLOT_NAME;
private String pluginName = PROTOBUF_PLUGIN_NAME; private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
private MessageDecoder messageDecoder = PostgresConnectorConfig.createDefaultMessageDecoder(PROTOBUF_PLUGIN_NAME);
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE; private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
private Integer statusUpdateIntervalMillis; private Integer statusUpdateIntervalMillis;
@ -280,9 +275,9 @@ public ReplicationConnectionBuilder withSlot(final String slotName) {
} }
@Override @Override
public ReplicationConnectionBuilder withPlugin(final String pluginName) { public ReplicationConnectionBuilder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin) {
assert pluginName != null; assert plugin != null;
this.pluginName = pluginName; this.plugin = plugin;
return this; return this;
} }
@ -298,17 +293,10 @@ public ReplicationConnectionBuilder statusUpdateIntervalMillis(final Integer sta
return this; return this;
} }
@Override
public Builder replicationMessageDecoder(final MessageDecoder messageDecoder) {
this.messageDecoder = messageDecoder;
return this;
}
@Override @Override
public ReplicationConnection build() { public ReplicationConnection build() {
assert pluginName != null : "Decoding plugin name is not set"; assert plugin != null : "Decoding plugin name is not set";
assert messageDecoder != null : "Replication message decoder is not provided"; return new PostgresReplicationConnection(config, slotName, plugin, dropSlotOnClose, statusUpdateIntervalMillis);
return new PostgresReplicationConnection(config, slotName, pluginName, dropSlotOnClose, statusUpdateIntervalMillis, messageDecoder);
} }
} }
} }

View File

@ -13,6 +13,7 @@
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
/** /**
* A Postgres logical streaming replication connection. Replication connections are established for a slot and a given plugin * A Postgres logical streaming replication connection. Replication connections are established for a slot and a given plugin
@ -88,8 +89,6 @@ interface Builder {
* Default replication settings * Default replication settings
*/ */
String DEFAULT_SLOT_NAME = "debezium"; String DEFAULT_SLOT_NAME = "debezium";
String PROTOBUF_PLUGIN_NAME = "decoderbufs";
String WAL2JSON_PLUGIN_NAME = "wal2json";
boolean DEFAULT_DROP_SLOT_ON_CLOSE = true; boolean DEFAULT_DROP_SLOT_ON_CLOSE = true;
/** /**
@ -102,13 +101,13 @@ interface Builder {
Builder withSlot(final String slotName); Builder withSlot(final String slotName);
/** /**
* Sets the name for the PG logical decoding plugin * Sets the instance for the PG logical decoding plugin
* *
* @param pluginName the name of the slot, may not be null. * @param pluginName the name of the slot, may not be null.
* @return this instance * @return this instance
* @see #PROTOBUF_PLUGIN_NAME * @see #PROTOBUF_PLUGIN_NAME
*/ */
Builder withPlugin(final String pluginName); Builder withPlugin(final PostgresConnectorConfig.LogicalDecoder plugin);
/** /**
* Whether or not to drop the replication slot once the replication connection closes * Whether or not to drop the replication slot once the replication connection closes
@ -127,8 +126,6 @@ interface Builder {
*/ */
Builder statusUpdateIntervalMillis(final Integer statusUpdateIntervalMillis); Builder statusUpdateIntervalMillis(final Integer statusUpdateIntervalMillis);
Builder replicationMessageDecoder(final MessageDecoder messageDecoder);
/** /**
* Creates a new {@link ReplicationConnection} instance * Creates a new {@link ReplicationConnection} instance
* @return a connection, never null * @return a connection, never null

View File

@ -84,4 +84,5 @@ public Object getType() {
*/ */
public List<Column> getNewTupleList(); public List<Column> getNewTupleList();
} }

View File

@ -7,7 +7,6 @@
package io.debezium.connector.postgresql.connection; package io.debezium.connector.postgresql.connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List;
import org.postgresql.replication.PGReplicationStream; import org.postgresql.replication.PGReplicationStream;
@ -17,30 +16,31 @@
* @author Horia Chiorean (hchiorea@redhat.com) * @author Horia Chiorean (hchiorea@redhat.com)
*/ */
public interface ReplicationStream extends AutoCloseable { public interface ReplicationStream extends AutoCloseable {
@FunctionalInterface
public interface ReplicationMessageProcessor {
void process(ReplicationMessage message) throws SQLException;
}
/** /**
* Blocks and waits for a Protobuf message to be sent over a replication connection. Once a message has been received, * Blocks and waits for a replication message to be sent over a replication connection. Once a message has been received,
* the value of the {@link #lastReceivedLSN() last received LSN} will also be updated accordingly. * the value of the {@link #lastReceivedLSN() last received LSN} will also be updated accordingly.
* *
* @return a {@link io.debezium.connector.postgresql.proto.PgProto.RowMessage} instance; this may return {@code null} if * @param processor - a callback to which the arrived message is passed
* the server sends back a message which has already been reported as consumed via the {@link #flushLSN()} method.
* @throws SQLException if anything unexpected fails * @throws SQLException if anything unexpected fails
* @see PGReplicationStream#read() * @see PGReplicationStream#read()
*/ */
List<ReplicationMessage> read() throws SQLException; void read(ReplicationMessageProcessor processor) throws SQLException;
/** /**
* Attempts to read a Protobuf message from a replication connection, returning that message if it's available or returning * Attempts to read a replication message from a replication connection, returning that message if it's available or returning
* {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLSN() last received LSN} * {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLSN() last received LSN}
* will also be updated accordingly. * will also be updated accordingly.
* *
* @return a {@link io.debezium.connector.postgresql.proto.PgProto.RowMessage} instance if a message is available and was * @param processor - a callback to which the arrived message is passed
* written by a server or {@code null} if nothing is available from the server or the server sends a message that has
* already been reported as consumed via the {@link #flushLSN()} method.
* @throws SQLException if anything unexpected fails * @throws SQLException if anything unexpected fails
* @see PGReplicationStream#readPending() * @see PGReplicationStream#readPending()
*/ */
List<ReplicationMessage> readPending() throws SQLException; void readPending(ReplicationMessageProcessor processor) throws SQLException;
/** /**
* Sends a message to the server informing it about that latest position in the WAL that this stream has read via * Sends a message to the server informing it about that latest position in the WAL that this stream has read via

View File

@ -6,21 +6,20 @@
package io.debezium.connector.postgresql.connection.pgproto; package io.debezium.connector.postgresql.connection.pgproto;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.connector.postgresql.connection.MessageDecoder; import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.connector.postgresql.proto.PgProto; import io.debezium.connector.postgresql.proto.PgProto;
/** /**
* ProtoBuf deserialization of message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>. * ProtoBuf deserialization of message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>.
* The message is encapsulated into a List as ProtBuf plugin sends only one message. * Only one message is delivered for processing.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
* *
@ -32,7 +31,7 @@ public PgProtoMessageDecoder() {
} }
@Override @Override
public List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer) { public void processMessage(final ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException {
try { try {
if (!buffer.hasArray()) { if (!buffer.hasArray()) {
throw new IllegalStateException( throw new IllegalStateException(
@ -40,7 +39,7 @@ public List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer) {
} }
byte[] source = buffer.array(); byte[] source = buffer.array();
byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length); byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
return Collections.singletonList(new PgProtoReplicationMessage(PgProto.RowMessage.parseFrom(content))); processor.process(new PgProtoReplicationMessage(PgProto.RowMessage.parseFrom(content)));
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -15,6 +15,7 @@
import org.postgresql.geometric.PGpoint; import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.PgArray; import org.postgresql.jdbc.PgArray;
import org.postgresql.util.PGInterval; import org.postgresql.util.PGInterval;
import org.postgresql.util.PGmoney;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -30,12 +31,12 @@
* @author Jiri Pechanec * @author Jiri Pechanec
* *
*/ */
class WAL2JSONColumn extends ReplicationMessage.Column { class Wal2JsonColumn extends ReplicationMessage.Column {
private static final Logger logger = LoggerFactory.getLogger(WAL2JSONColumn.class); private static final Logger logger = LoggerFactory.getLogger(Wal2JsonColumn.class);
private final Value rawValue; private final Value rawValue;
public WAL2JSONColumn(final String name, final String type, final Value value) { public Wal2JsonColumn(final String name, final String type, final Value value) {
super(name, type); super(name, type);
this.rawValue = value; this.rawValue = value;
} }
@ -92,15 +93,7 @@ public Object getValue(final PgConnectionSupplier connection) {
case "timetz": case "timetz":
return rawValue.isNotNull() ? DateTimeFormat.get().timeWithTimeZone(rawValue.asString()) : null; return rawValue.isNotNull() ? DateTimeFormat.get().timeWithTimeZone(rawValue.asString()) : null;
case "bytea": case "bytea":
if (rawValue.isNull()) { return hexStringToByteArray();
return null;
}
final String hex = rawValue.asString();
final byte[] bytes = new byte[hex.length() / 2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte)Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
}
return bytes;
case "point": case "point":
try { try {
return rawValue.isNotNull() ? new PGpoint(rawValue.asString()) : null; return rawValue.isNotNull() ? new PGpoint(rawValue.asString()) : null;
@ -109,7 +102,12 @@ public Object getValue(final PgConnectionSupplier connection) {
throw new ConnectException(e); throw new ConnectException(e);
} }
case "money": case "money":
return rawValue.isNotNull() ? Double.parseDouble(rawValue.asString().replace("$", "").replace(",", "")) : null; try {
return rawValue.isNotNull() ? new PGmoney(rawValue.asString()).val : null;
} catch (final SQLException e) {
logger.error("Failed to parse money {}, {}", rawValue.asString(), e);
throw new ConnectException(e);
}
case "interval": case "interval":
try { try {
return rawValue.isNotNull() ? new PGInterval(rawValue.asString()) : null; return rawValue.isNotNull() ? new PGInterval(rawValue.asString()) : null;
@ -160,4 +158,16 @@ public Object getValue(final PgConnectionSupplier connection) {
getType()); getType());
return rawValue.asBytes(); return rawValue.asBytes();
} }
private byte[] hexStringToByteArray() {
if (rawValue.isNull()) {
return null;
}
final String hex = rawValue.asString();
final byte[] bytes = new byte[hex.length() / 2];
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte)Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
}
return bytes;
}
} }

View File

@ -7,16 +7,15 @@
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder; import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.connection.MessageDecoder; import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage; import io.debezium.connector.postgresql.connection.ReplicationStream.ReplicationMessageProcessor;
import io.debezium.document.Array; import io.debezium.document.Array;
import io.debezium.document.Document; import io.debezium.document.Document;
import io.debezium.document.DocumentReader; import io.debezium.document.DocumentReader;
@ -24,36 +23,36 @@
/** /**
* JSON deserialization of a message sent by * JSON deserialization of a message sent by
* <a href="https://github.com/eulerto/wal2json">wal2json</a> logical decoding plugin. The plugin sends all * <a href="https://github.com/eulerto/wal2json">wal2json</a> logical decoding plugin. The plugin sends all
* changes in one transaction as a single batch. * changes in one transaction as a single batch and they are passed to processor one-by-one.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
* *
*/ */
public class WAL2JSONMessageDecoder implements MessageDecoder { public class Wal2JsonMessageDecoder implements MessageDecoder {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final DateTimeFormat dateTime = DateTimeFormat.get(); private final DateTimeFormat dateTime = DateTimeFormat.get();
public WAL2JSONMessageDecoder() { public Wal2JsonMessageDecoder() {
super(); super();
} }
@Override @Override
public List<ReplicationMessage> deserializeMessage(final ByteBuffer buffer) { public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException {
try { try {
if (!buffer.hasArray()) { if (!buffer.hasArray()) {
throw new IllegalStateException("Invalid buffer received from PG server during streaming replication"); throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
} }
byte[] source = buffer.array(); final byte[] source = buffer.array();
byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length); final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
final Document message = DocumentReader.defaultReader().read(content); final Document message = DocumentReader.defaultReader().read(content);
logger.debug("Message arrived for decoding {}", message); logger.debug("Message arrived for decoding {}", message);
final int txId = message.getInteger("xid"); final int txId = message.getInteger("xid");
final String timestamp = message.getString("timestamp"); final String timestamp = message.getString("timestamp");
final long commitTime = dateTime.systemTimestamp(timestamp); final long commitTime = dateTime.systemTimestamp(timestamp);
final Array changes = message.getArray("change"); final Array changes = message.getArray("change");
return changes.streamValues() for (Array.Entry e: changes) {
.map(x -> new WAL2JSONReplicationMessage(txId, commitTime, x.asDocument())) processor.process(new Wal2JsonReplicationMessage(txId, commitTime, e.getValue().asDocument()));
.collect(Collectors.toList()); }
} catch (final IOException e) { } catch (final IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }

View File

@ -21,12 +21,12 @@
* @author Jiri Pechanec * @author Jiri Pechanec
* *
*/ */
class WAL2JSONReplicationMessage implements ReplicationMessage { class Wal2JsonReplicationMessage implements ReplicationMessage {
private final int txId; private final int txId;
private final long commitTime; private final long commitTime;
private final Document rawMessage; private final Document rawMessage;
public WAL2JSONReplicationMessage(final int txId, final long commitTime, final Document rawMessage) { public Wal2JsonReplicationMessage(final int txId, final long commitTime, final Document rawMessage) {
this.txId = txId; this.txId = txId;
this.commitTime = commitTime; this.commitTime = commitTime;
this.rawMessage = rawMessage; this.rawMessage = rawMessage;
@ -59,7 +59,7 @@ public int getTransactionId() {
@Override @Override
public String getTable() { public String getTable() {
return String.format("\"%s\".\"%s\"", rawMessage.getString("schema"), rawMessage.getString("table")); return "\"" + rawMessage.getString("schema") + "\".\"" + rawMessage.getString("table") + "\"";
} }
@Override @Override
@ -82,7 +82,7 @@ private List<ReplicationMessage.Column> transform(final Document data, final Str
} }
final List<ReplicationMessage.Column> columns = new ArrayList<>(); final List<ReplicationMessage.Column> columns = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) { for (int i = 0; i < columnNames.size(); i++) {
columns.add(new WAL2JSONColumn(columnNames.get(i).asString(), columnTypes.get(i).asString(), columnValues.get(i))); columns.add(new Wal2JsonColumn(columnNames.get(i).asString(), columnTypes.get(i).asString(), columnValues.get(i)));
} }
return columns; return columns;
} }

View File

@ -8,18 +8,35 @@
import java.util.function.Supplier; import java.util.function.Supplier;
import io.debezium.connector.postgresql.connection.ReplicationConnection; /**
* A class that contains assertions or expected values tailored to the behaviour of a concrete decoder plugin
*
* @author Jiri Pechanec
*
*/
public class DecoderDifferences { public class DecoderDifferences {
/**
* wal2json plugin does not send events for updates on tables that does not define primary key.
*
* @param expectedCount
* @param updatesWithoutPK
* @return modified count
*/
public static int updatesWithoutPK(final int expectedCount, final int updatesWithoutPK) { public static int updatesWithoutPK(final int expectedCount, final int updatesWithoutPK) {
return !ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME.equals(TestHelper.decoderPluginName()) ? expectedCount : expectedCount - updatesWithoutPK; return TestHelper.decoderPlugin() != PostgresConnectorConfig.LogicalDecoder.WAL2JSON ? expectedCount : expectedCount - updatesWithoutPK;
} }
/**
* wal2json plugin is not currently able to encode and parse quoted identifiers
*
* @author Jiri Pechanec
*
*/
public static class AreQuotedIdentifiersUnsupported implements Supplier<Boolean> { public static class AreQuotedIdentifiersUnsupported implements Supplier<Boolean> {
@Override @Override
public Boolean get() { public Boolean get() {
return ReplicationConnection.Builder.WAL2JSON_PLUGIN_NAME.equals(TestHelper.decoderPluginName()); return TestHelper.decoderPlugin() == PostgresConnectorConfig.LogicalDecoder.WAL2JSON;
} }
} }
} }

View File

@ -40,6 +40,7 @@
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.data.Envelope; import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord; import io.debezium.data.VerifyRecord;
@ -122,7 +123,7 @@ public void shouldValidateConfiguration() throws Exception {
assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1); assertConfigurationErrors(validatedConfig, PostgresConnectorConfig.DATABASE_NAME, 1);
// validate the non required fields // validate the non required fields
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME); validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.DECODERBUFS.getValue());
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_DECODING_CLASS, null); validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_DECODING_CLASS, null);
validateField(validatedConfig, PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME); validateField(validatedConfig, PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
validateField(validatedConfig, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE); validateField(validatedConfig, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);

View File

@ -32,6 +32,7 @@ public final class TestHelper {
protected static final String TEST_SERVER = "test_server"; protected static final String TEST_SERVER = "test_server";
protected static final String PK_FIELD = "pk"; protected static final String PK_FIELD = "pk";
private static final String TEST_PROPERTY_PREFIX = "debezium.test.";
private TestHelper() { private TestHelper() {
} }
@ -45,21 +46,20 @@ private TestHelper() {
* @throws SQLException if there is a problem obtaining a replication connection * @throws SQLException if there is a problem obtaining a replication connection
*/ */
public static ReplicationConnection createForReplication(String slotName, boolean dropOnClose) throws SQLException { public static ReplicationConnection createForReplication(String slotName, boolean dropOnClose) throws SQLException {
String pluginName = decoderPluginName(); final PostgresConnectorConfig.LogicalDecoder plugin = decoderPlugin();
return ReplicationConnection.builder(defaultJdbcConfig()) return ReplicationConnection.builder(defaultJdbcConfig())
.withPlugin(pluginName) .withPlugin(plugin)
.replicationMessageDecoder(PostgresConnectorConfig.createDefaultMessageDecoder(pluginName))
.withSlot(slotName) .withSlot(slotName)
.dropSlotOnClose(dropOnClose) .dropSlotOnClose(dropOnClose)
.build(); .build();
} }
/** /**
* @return * @return the decoder plugin used for testing and configured by system property
*/ */
static String decoderPluginName() { static PostgresConnectorConfig.LogicalDecoder decoderPlugin() {
final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name()); final String s = System.getProperty(PostgresConnectorConfig.PLUGIN_NAME.name());
return (s == null || s.length() == 0) ? ReplicationConnection.Builder.PROTOBUF_PLUGIN_NAME : s; return (s == null || s.length() == 0) ? PostgresConnectorConfig.LogicalDecoder.DECODERBUFS : PostgresConnectorConfig.LogicalDecoder.parse(s);
} }
/** /**
@ -132,9 +132,9 @@ protected static Configuration.Builder defaultConfig() {
builder.with(PostgresConnectorConfig.SERVER_NAME, TEST_SERVER) builder.with(PostgresConnectorConfig.SERVER_NAME, TEST_SERVER)
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true)
.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100) .with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, 100)
.with(PostgresConnectorConfig.PLUGIN_NAME, decoderPluginName()) .with(PostgresConnectorConfig.PLUGIN_NAME, decoderPlugin())
.with(PostgresConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED); .with(PostgresConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLED);
final String testNetworkTimeout = System.getProperty("test.network.timeout"); final String testNetworkTimeout = System.getProperty(TEST_PROPERTY_PREFIX + "network.timeout");
if (testNetworkTimeout != null && testNetworkTimeout.length() != 0) { if (testNetworkTimeout != null && testNetworkTimeout.length() != 0) {
builder.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, Integer.parseInt(testNetworkTimeout)); builder.with(PostgresConnectorConfig.STATUS_UPDATE_INTERVAL_MS, Integer.parseInt(testNetworkTimeout));
} }
@ -157,10 +157,10 @@ protected static String topicName(String suffix) {
} }
protected static boolean shouldSSLConnectionFail() { protected static boolean shouldSSLConnectionFail() {
return Boolean.parseBoolean(System.getProperty("test.ssl.failonconnect", "true")); return Boolean.parseBoolean(System.getProperty(TEST_PROPERTY_PREFIX + "ssl.failonconnect", "true"));
} }
protected static int waitTimeForRecords() { protected static int waitTimeForRecords() {
return Integer.parseInt(System.getProperty("test.records.waittime", "2")); return Integer.parseInt(System.getProperty(TEST_PROPERTY_PREFIX + "records.waittime", "2"));
} }
} }

View File

@ -245,9 +245,13 @@ private List<ReplicationMessage> expectedMessagesFromStream(ReplicationStream st
CountDownLatch latch = new CountDownLatch(expectedMessages); CountDownLatch latch = new CountDownLatch(expectedMessages);
Metronome metronome = Metronome.sleeper(50, TimeUnit.MILLISECONDS, Clock.SYSTEM); Metronome metronome = Metronome.sleeper(50, TimeUnit.MILLISECONDS, Clock.SYSTEM);
Future<?> result = executorService.submit(() -> { Future<?> result = executorService.submit(() -> {
List<ReplicationMessage> message;
while (!Thread.interrupted()) { while (!Thread.interrupted()) {
while ((message = stream.readPending()) != null) { for(;;) {
List<ReplicationMessage> message = new ArrayList<>();
stream.readPending(x -> message.add(x));
if (message.isEmpty()) {
break;
}
actualMessages.addAll(message); actualMessages.addAll(message);
latch.countDown(); latch.countDown();
} }