DBZ-258 Misc. improvements while reviewing the change;
* Removing superfluous config option * Making loggers static * JavaDoc fixes * Extracting hexStringToByteArray() to helper and adding test * Removing superfluous super() invocations
This commit is contained in:
parent
a78ef1a1d3
commit
73189892b3
@ -16,7 +16,6 @@
|
||||
import org.apache.kafka.common.config.ConfigDef.Type;
|
||||
import org.apache.kafka.common.config.ConfigDef.Width;
|
||||
import org.apache.kafka.common.config.ConfigValue;
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
|
||||
import io.debezium.config.Configuration;
|
||||
import io.debezium.config.EnumeratedValue;
|
||||
@ -340,24 +339,26 @@ public static TopicSelectionStrategy parse(String value) {
|
||||
}
|
||||
|
||||
public enum LogicalDecoder implements EnumeratedValue {
|
||||
DECODERBUFS("decoderbufs", PgProtoMessageDecoder.class),
|
||||
WAL2JSON("wal2json", Wal2JsonMessageDecoder.class);
|
||||
DECODERBUFS("decoderbufs") {
|
||||
@Override
|
||||
public MessageDecoder messageDecoder() {
|
||||
return new PgProtoMessageDecoder();
|
||||
}
|
||||
},
|
||||
WAL2JSON("wal2json") {
|
||||
@Override
|
||||
public MessageDecoder messageDecoder() {
|
||||
return new Wal2JsonMessageDecoder();
|
||||
}
|
||||
};
|
||||
|
||||
private final String decoderName;
|
||||
private final Class<? extends MessageDecoder> messageDecoder;
|
||||
|
||||
LogicalDecoder(String decoderName, Class<? extends MessageDecoder> messageDecoder) {
|
||||
LogicalDecoder(String decoderName) {
|
||||
this.decoderName = decoderName;
|
||||
this.messageDecoder = messageDecoder;
|
||||
}
|
||||
|
||||
public MessageDecoder messageDecoder() {
|
||||
try {
|
||||
return messageDecoder.newInstance();
|
||||
} catch (InstantiationException | IllegalAccessException e) {
|
||||
throw new ConnectException("Cannot instantiate decoding class '" + messageDecoder + "' for decoder plugin '" + getValue() + "'");
|
||||
}
|
||||
}
|
||||
public abstract MessageDecoder messageDecoder();
|
||||
|
||||
public static LogicalDecoder parse(String s) {
|
||||
return valueOf(s.trim().toUpperCase());
|
||||
@ -386,14 +387,6 @@ public String getValue() {
|
||||
.withImportance(Importance.MEDIUM)
|
||||
.withDescription("The name of the Postgres logical decoding plugin installed on the server. Defaults to '"+ LogicalDecoder.DECODERBUFS.getValue() + "'");
|
||||
|
||||
public static final Field PLUGIN_DECODING_CLASS = Field.create("plugin.decoding.class")
|
||||
.withDisplayName("Plugin decoder class")
|
||||
.withType(Type.CLASS)
|
||||
.withWidth(Width.MEDIUM)
|
||||
.withImportance(Importance.LOW)
|
||||
.withDescription("The Java class used to decode events coming from logical decoder plugin." +
|
||||
"If not provided then default class associated with requested plugin is ued.");
|
||||
|
||||
public static final Field SLOT_NAME = Field.create("slot.name")
|
||||
.withDisplayName("Slot")
|
||||
.withType(Type.STRING)
|
||||
@ -689,7 +682,7 @@ public String getValue() {
|
||||
/**
|
||||
* The set of {@link Field}s defined as part of this configuration.
|
||||
*/
|
||||
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, PLUGIN_DECODING_CLASS, SLOT_NAME, DROP_SLOT_ON_STOP,
|
||||
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP,
|
||||
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, SERVER_NAME,
|
||||
TOPIC_SELECTION_STRATEGY, MAX_BATCH_SIZE,
|
||||
MAX_QUEUE_SIZE, POLL_INTERVAL_MS, SCHEMA_WHITELIST,
|
||||
@ -833,7 +826,7 @@ protected boolean initialOnlySnapshot() {
|
||||
|
||||
protected static ConfigDef configDef() {
|
||||
ConfigDef config = new ConfigDef();
|
||||
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, PLUGIN_DECODING_CLASS, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
|
||||
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
|
||||
USER, PASSWORD, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
|
||||
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
|
||||
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
|
||||
|
@ -36,8 +36,9 @@
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* A {@link RecordsProducer} which creates {@link org.apache.kafka.connect.source.SourceRecord records} from a Postgres
|
||||
* streaming replication connection and {@link io.debezium.connector.postgresql.connection.ReplicationMessage messages}.
|
||||
* A {@link RecordsProducer} which creates {@link SourceRecord records} from a
|
||||
* Postgres streaming replication connection and {@link ReplicationMessage
|
||||
* messages}.
|
||||
*
|
||||
* @author Horia Chiorean (hchiorea@redhat.com)
|
||||
*/
|
||||
|
@ -32,7 +32,7 @@ public interface MessageDecoder {
|
||||
|
||||
/**
|
||||
* Allows MessageDecoder to configure options with which the replication stream is started.
|
||||
* See POstgreSQL command START_REPLICATION SLOT for more details.
|
||||
* See PostgreSQL command START_REPLICATION SLOT for more details.
|
||||
*
|
||||
* @param builder
|
||||
* @return the builder instance
|
||||
|
@ -10,7 +10,6 @@
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLWarning;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -49,7 +48,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
|
||||
*
|
||||
* @param config the JDBC configuration for the connection; may not be null
|
||||
* @param slotName the name of the DB slot for logical replication; may not be null
|
||||
* @param plugin the type of the server side plugin used for streaming changes; may not be null;
|
||||
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be null
|
||||
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
|
||||
* @param statusUpdateIntervalMillis the number of milli-seconds at which the replication connection should periodically send status
|
||||
* updates to the server
|
||||
@ -256,7 +255,7 @@ protected static void defaultSettings(Configuration.Builder builder) {
|
||||
|
||||
protected static class ReplicationConnectionBuilder implements Builder {
|
||||
|
||||
private Configuration config;
|
||||
private final Configuration config;
|
||||
private String slotName = DEFAULT_SLOT_NAME;
|
||||
private PostgresConnectorConfig.LogicalDecoder plugin = PostgresConnectorConfig.LogicalDecoder.DECODERBUFS;
|
||||
private boolean dropSlotOnClose = DEFAULT_DROP_SLOT_ON_CLOSE;
|
||||
|
@ -11,8 +11,8 @@
|
||||
import io.debezium.connector.postgresql.RecordsStreamProducer.PgConnectionSupplier;
|
||||
|
||||
/**
|
||||
* An abstract represantion of replication message that is sent by PostgreSQL logical decoding plugin and
|
||||
* is processed by Debezium PostgreSQL connector.
|
||||
* An abstract representation of a replication message that is sent by a PostgreSQL logical decoding plugin and
|
||||
* is processed by the Debezium PostgreSQL connector.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
@ -38,7 +38,6 @@ public abstract class Column {
|
||||
private final Object type;
|
||||
|
||||
public Column(final String name, final Object type) {
|
||||
super();
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
}
|
||||
@ -83,6 +82,4 @@ public Object getType() {
|
||||
* @return Set of new values of table columns, null for DELETE
|
||||
*/
|
||||
public List<Column> getNewTupleList();
|
||||
|
||||
|
||||
}
|
||||
|
@ -32,7 +32,8 @@
|
||||
*
|
||||
*/
|
||||
class PgProtoColumn extends ReplicationMessage. Column {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PgProtoColumn.class);
|
||||
|
||||
private final PgProto.DatumMessage raw;
|
||||
|
||||
@ -157,11 +158,11 @@ public Object getValue(final PgConnectionSupplier connection) {
|
||||
return Arrays.asList((Object[])deserializedArray);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
logger.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", datumMessage.getColumnName(), e);
|
||||
}
|
||||
return null;
|
||||
default: {
|
||||
logger.warn("processing column '{}' with unknown data type '{}' as byte array", datumMessage.getColumnName(),
|
||||
LOGGER.warn("processing column '{}' with unknown data type '{}' as byte array", datumMessage.getColumnName(),
|
||||
datumMessage.getColumnType());
|
||||
return datumMessage.hasDatumBytes()? datumMessage.getDatumBytes().toByteArray() : null;
|
||||
}
|
||||
|
@ -24,7 +24,7 @@
|
||||
import io.debezium.time.NanoTimestamp;
|
||||
|
||||
/**
|
||||
* Transformer for time/date related string representation in JSON message coming from plugin.
|
||||
* Transformer for time/date related string representations in JSON messages coming from the wal2json plugin.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
@ -41,7 +41,7 @@ public static DateTimeFormat get() {
|
||||
return new ISODateTimeFormat();
|
||||
}
|
||||
public static class ISODateTimeFormat implements DateTimeFormat {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ISODateTimeFormat.class);
|
||||
|
||||
private static final String TS_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
|
||||
private static final DateTimeFormatter TS_FORMAT = DateTimeFormatter.ofPattern(TS_FORMAT_PATTERN);
|
||||
@ -110,7 +110,7 @@ private <T> T format(final String pattern, final String s, final Supplier<T> val
|
||||
try {
|
||||
return value.get();
|
||||
} catch (final DateTimeParseException e) {
|
||||
logger.error("Cannot parse time/date value '{}', expected format '{}'", s, pattern);
|
||||
LOGGER.error("Cannot parse time/date value '{}', expected format '{}'", s, pattern);
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
}
|
||||
|
@ -24,15 +24,16 @@
|
||||
import io.debezium.connector.postgresql.connection.ReplicationMessage;
|
||||
import io.debezium.document.Value;
|
||||
import io.debezium.relational.Column;
|
||||
import io.debezium.util.Strings;
|
||||
|
||||
/**
|
||||
* Logical encapsulation of column changes sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
|
||||
* Logical encapsulation of column changes sent by the wal2json logical decoding plug-in.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
class Wal2JsonColumn extends ReplicationMessage.Column {
|
||||
private static final Logger logger = LoggerFactory.getLogger(Wal2JsonColumn.class);
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Wal2JsonColumn.class);
|
||||
|
||||
private final Value rawValue;
|
||||
|
||||
@ -47,7 +48,7 @@ public Wal2JsonColumn(final String name, final String type, final Value value) {
|
||||
* {@link PostgresValueConverter#converter(Column, Field)} instance to match whatever the Connect schema type expects.
|
||||
*
|
||||
* Note that the logic here is tightly coupled (i.e. dependent) on the wal2json plugin logic which writes the actual
|
||||
* Protobuf messages.
|
||||
* JSON messages.
|
||||
*
|
||||
* @param a supplier to get a connection to Postgres instance for array handling
|
||||
* @return the value; may be null
|
||||
@ -93,26 +94,26 @@ public Object getValue(final PgConnectionSupplier connection) {
|
||||
case "timetz":
|
||||
return rawValue.isNotNull() ? DateTimeFormat.get().timeWithTimeZone(rawValue.asString()) : null;
|
||||
case "bytea":
|
||||
return hexStringToByteArray();
|
||||
return Strings.hexStringToByteArray(rawValue.asString());
|
||||
case "point":
|
||||
try {
|
||||
return rawValue.isNotNull() ? new PGpoint(rawValue.asString()) : null;
|
||||
} catch (final SQLException e) {
|
||||
logger.error("Failed to parse point {}, {}", rawValue.asString(), e);
|
||||
LOGGER.error("Failed to parse point {}, {}", rawValue.asString(), e);
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
case "money":
|
||||
try {
|
||||
return rawValue.isNotNull() ? new PGmoney(rawValue.asString()).val : null;
|
||||
} catch (final SQLException e) {
|
||||
logger.error("Failed to parse money {}, {}", rawValue.asString(), e);
|
||||
LOGGER.error("Failed to parse money {}, {}", rawValue.asString(), e);
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
case "interval":
|
||||
try {
|
||||
return rawValue.isNotNull() ? new PGInterval(rawValue.asString()) : null;
|
||||
} catch (final SQLException e) {
|
||||
logger.error("Failed to parse point {}, {}", rawValue.asString(), e);
|
||||
LOGGER.error("Failed to parse point {}, {}", rawValue.asString(), e);
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
case "_int2":
|
||||
@ -150,24 +151,12 @@ public Object getValue(final PgConnectionSupplier connection) {
|
||||
return Arrays.asList((Object[])deserializedArray);
|
||||
}
|
||||
catch (SQLException e) {
|
||||
logger.warn("Unexpected exception trying to process PgArray column '{}'", getName(), e);
|
||||
LOGGER.warn("Unexpected exception trying to process PgArray column '{}'", getName(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
logger.warn("processing column '{}' with unknown data type '{}' as byte array", getName(),
|
||||
LOGGER.warn("processing column '{}' with unknown data type '{}' as byte array", getName(),
|
||||
getType());
|
||||
return rawValue.asBytes();
|
||||
}
|
||||
|
||||
private byte[] hexStringToByteArray() {
|
||||
if (rawValue.isNull()) {
|
||||
return null;
|
||||
}
|
||||
final String hex = rawValue.asString();
|
||||
final byte[] bytes = new byte[hex.length() / 2];
|
||||
for (int i = 0; i < bytes.length; i++) {
|
||||
bytes[i] = (byte)Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
import java.sql.SQLException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.kafka.connect.errors.ConnectException;
|
||||
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -29,12 +30,10 @@
|
||||
*
|
||||
*/
|
||||
public class Wal2JsonMessageDecoder implements MessageDecoder {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Wal2JsonMessageDecoder.class);
|
||||
|
||||
private final DateTimeFormat dateTime = DateTimeFormat.get();
|
||||
public Wal2JsonMessageDecoder() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException {
|
||||
@ -45,7 +44,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
final byte[] source = buffer.array();
|
||||
final byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
|
||||
final Document message = DocumentReader.defaultReader().read(content);
|
||||
logger.debug("Message arrived for decoding {}", message);
|
||||
LOGGER.debug("Message arrived for decoding {}", message);
|
||||
final int txId = message.getInteger("xid");
|
||||
final String timestamp = message.getString("timestamp");
|
||||
final long commitTime = dateTime.systemTimestamp(timestamp);
|
||||
@ -54,7 +53,7 @@ public void processMessage(ByteBuffer buffer, ReplicationMessageProcessor proces
|
||||
processor.process(new Wal2JsonReplicationMessage(txId, commitTime, e.getValue().asDocument()));
|
||||
}
|
||||
} catch (final IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
throw new ConnectException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,12 +16,12 @@
|
||||
import io.debezium.document.Document;
|
||||
|
||||
/**
|
||||
* Replication message representing message sent by <a href="https://github.com/debezium/postgres-decoderbufs">Postgres Decoderbufs</>
|
||||
* Replication message representing message sent by the wal2json logical decoding plug-in.
|
||||
*
|
||||
* @author Jiri Pechanec
|
||||
*
|
||||
*/
|
||||
class Wal2JsonReplicationMessage implements ReplicationMessage {
|
||||
|
||||
private final int txId;
|
||||
private final long commitTime;
|
||||
private final Document rawMessage;
|
||||
@ -77,13 +77,17 @@ private List<ReplicationMessage.Column> transform(final Document data, final Str
|
||||
final Array columnNames = data.getArray(nameField);
|
||||
final Array columnTypes = data.getArray(typeField);
|
||||
final Array columnValues = data.getArray(valueField);
|
||||
|
||||
if (columnNames.size() != columnTypes.size() || columnNames.size() != columnValues.size()) {
|
||||
throw new ConnectException("Column related arrays do not have the same size");
|
||||
}
|
||||
final List<ReplicationMessage.Column> columns = new ArrayList<>();
|
||||
|
||||
final List<ReplicationMessage.Column> columns = new ArrayList<>(columnNames.size());
|
||||
|
||||
for (int i = 0; i < columnNames.size(); i++) {
|
||||
columns.add(new Wal2JsonColumn(columnNames.get(i).asString(), columnTypes.get(i).asString(), columnValues.get(i)));
|
||||
}
|
||||
|
||||
return columns;
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +124,6 @@ public void shouldValidateConfiguration() throws Exception {
|
||||
|
||||
// validate the non required fields
|
||||
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_NAME, LogicalDecoder.DECODERBUFS.getValue());
|
||||
validateField(validatedConfig, PostgresConnectorConfig.PLUGIN_DECODING_CLASS, null);
|
||||
validateField(validatedConfig, PostgresConnectorConfig.SLOT_NAME, ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
|
||||
validateField(validatedConfig, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
|
||||
validateField(validatedConfig, PostgresConnectorConfig.PORT, PostgresConnectorConfig.DEFAULT_PORT);
|
||||
|
@ -822,6 +822,25 @@ public static String unquoteIdentifierPart(String identifierPart) {
|
||||
return identifierPart;
|
||||
}
|
||||
|
||||
/**
|
||||
* Restores a byte array that is encoded as a hex string.
|
||||
*/
|
||||
public static byte[] hexStringToByteArray(String hexString) {
|
||||
if (hexString == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int length = hexString.length();
|
||||
|
||||
byte[] bytes = new byte[length / 2];
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
bytes[i / 2] = (byte) ((Character.digit(hexString.charAt(i), 16) << 4)
|
||||
+ Character.digit(hexString.charAt(i+1), 16));
|
||||
}
|
||||
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static Character deriveQuotingChar(String identifierPart) {
|
||||
char first = identifierPart.charAt(0);
|
||||
char last = identifierPart.charAt(identifierPart.length() - 1);
|
||||
|
@ -251,6 +251,14 @@ public void unquoteIdentifierPartShouldSupportBackTicks() {
|
||||
assertThat(Strings.unquoteIdentifierPart("`Tab``le`")).isEqualTo("Tab`le");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hexStringToByteArrayShouldReturnCorrectByteArray() {
|
||||
assertThat(Strings.hexStringToByteArray(null)).isNull();
|
||||
assertThat(Strings.hexStringToByteArray("00")).isEqualTo(new byte[] { 0 });
|
||||
assertThat(Strings.hexStringToByteArray("010203")).isEqualTo(new byte[] { 1, 2, 3 });
|
||||
assertThat(Strings.hexStringToByteArray("CAFEBABE")).isEqualTo(new byte[] { -54, -2, -70, -66 });
|
||||
}
|
||||
|
||||
protected void assertReplacement(String before, Map<String, String> replacements, String after) {
|
||||
String result = Strings.replaceVariables(before, replacements::get);
|
||||
assertThat(result).isEqualTo(after);
|
||||
|
Loading…
Reference in New Issue
Block a user