DBZ-1082 Add new custom recovery mode, more metadata

This commit does a few things:

- Refactors snapshot modes to be encapsulated by an interface and
to only use that interface in determining when to snapshot and in
determing the type of the `RecordProducer` interface to instantiate
- Refactors the configuration of existing snapshot modes to tie the
existing snapshot modes to their aligned implementation
- Adds a new snapshot.mode, custom, and a new configuration option to
specify a custom implementation that will be loaded by the class loader
- Changes the visibility of some classes to allow for custom snapshot
modes to get enough context to make an informed choice
- Adds some metadata about slots (the catalog_xmin) to give a full idea
of the state of slots which can be useful in implementing snapshot
modes (which is also configurable, as it can add some overhead)

Together, these changes allow for a much broader flexibility got end
users to implement a snapshot mode that can do more advanced snapshots,
such as partial recovery or for partial snapshots for tables where not
all records are needed.

This could also be seen as superseeding the
`snapshot.select.statement.overrides` to allow for users to dynamically
build queries based on the state of the slot and the offsets consumed.
This commit is contained in:
Addison Higham 2019-02-21 13:54:33 -07:00 committed by Gunnar Morling
parent b11f1b8d05
commit 8e21139ca3
26 changed files with 979 additions and 185 deletions

View File

@ -6,19 +6,26 @@
package io.debezium.connector.postgresql;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigValue;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.connection.MessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
@ -95,34 +102,52 @@ public static HStoreHandlingMode parse(String value,String defaultValue){
}
/**
* The set of predefined SnapshotMode options or aliases.
* The set of predefined Snapshotter options or aliases.
*/
public enum SnapshotMode implements EnumeratedValue {
/**
* Always perform a snapshot when starting.
*/
ALWAYS("always"),
ALWAYS("always", (c) -> new AlwaysSnapshotter()),
/**
* Perform a snapshot only upon initial startup of a connector.
*/
INITIAL("initial"),
INITIAL("initial", (c) -> new InitialSnapshotter()),
/**
* Never perform a snapshot and only receive logical changes.
*/
NEVER("never"),
NEVER("never", (c) -> new NeverSnapshotter()),
/**
* Perform a snapshot and then stop before attempting to receive any logical changes.
*/
INITIAL_ONLY("initial_only");
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
CUSTOM("custom", (c) -> {
return c.getInstance(SNAPSHOT_MODE_CLASS, Snapshotter.class);
});
@FunctionalInterface
public interface SnapshotterBuilder {
Snapshotter buildSnapshotter(Configuration config);
}
private final String value;
private final SnapshotterBuilder builderFunc;
SnapshotMode(String value) {
SnapshotMode(String value, SnapshotterBuilder buildSnapshotter) {
this.value = value;
this.builderFunc = buildSnapshotter;
}
public Snapshotter getSnapshotter(Configuration config) {
return builderFunc.buildSnapshotter(config);
}
@Override
@ -667,8 +692,24 @@ public static SchemaRefreshMode parse(String value) {
+ "Options include: "
+ "'always' to specify that the connector run a snapshot each time it starts up; "
+ "'initial' (the default) to specify the connector can run a snapshot only when no offsets are available for the logical server name; "
+ "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes; and"
+ "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the last position (LSN) recorded by the server");
+ "'initial_only' same as 'initial' except the connector should stop after completing the snapshot and before it would normally start emitting changes;"
+ "'never' to specify the connector should never run a snapshot and that upon first startup the connector should read from the last position (LSN) recorded by the server; and"
+ "'custom' to specify a custom class with 'snapshot.custom_class' which will be loaded and used to determine the snapshot, see docs for more details.");
public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class")
.withDisplayName("Snapshot Mode Custom Class")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_MODE).toLowerCase().equals("custom") && config.getString(field).isEmpty()) {
output.accept(field, "", "snapshot.custom_class cannot be empty when snapshot.mode 'custom' is defined");
return 1;
}
return 0;
})
.withDescription("When 'snapshot.mode' is set as custom, this setting must be set to specify a fully qualified class name to load (via the default class loader)."
+ "This class must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot and how to build queries.");
public static final Field SNAPSHOT_LOCK_TIMEOUT_MS = Field.create("snapshot.lock.timeout.ms")
.withDisplayName("Snapshot lock timeout (ms)")
@ -750,6 +791,19 @@ public static SchemaRefreshMode parse(String value) {
"This setting can improve connector performance significantly if there are frequently-updated tables that " +
"have TOASTed data that are rarely part of these updates. However, it is possible for the in-memory schema to " +
"become outdated if TOASTable columns are dropped from the table.");
public static final Field XMIN_FETCH_INTERVAL = Field.create("xmin.fetch.interval.ms")
.withDisplayName("Xmin fetch interval (ms)")
.withType(Type.LONG)
.withWidth(Width.SHORT)
.withDefault(0L)
.withImportance(Importance.MEDIUM)
.withDescription("Specify how often (in ms) the xmin will be fetched from the replication slot. " +
"This xmin value is exposed by the slot which gives a lower bound of where a new replication slot could start from. " +
"The lower the value, the more likely this value is to be the current 'true' value, but the bigger the performance cost. " +
"The bigger the value, the less likely this value is to be the current 'true' value, but the lower the performance penalty. " +
"The default is set to 0 ms, which disables tracking xmin.")
.withValidation(Field::isNonNegativeLong);
/**
* The set of {@link Field}s defined as part of this configuration.
*/
@ -767,7 +821,8 @@ public static SchemaRefreshMode parse(String value) {
SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD,
SSL_ROOT_CERT, SSL_CLIENT_KEY, SNAPSHOT_LOCK_TIMEOUT_MS, ROWS_FETCH_SIZE, SSL_SOCKET_FACTORY,
STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, INCLUDE_UNKNOWN_DATATYPES,
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, SCHEMA_REFRESH_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE);
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, SCHEMA_REFRESH_MODE, CommonConnectorConfig.TOMBSTONES_ON_DELETE,
XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS);
private final Configuration config;
private final TemporalPrecisionMode temporalPrecisionMode;
@ -890,23 +945,15 @@ protected long snapshotLockTimeoutMillis() {
return config.getLong(PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS);
}
protected boolean snapshotNeverAllowed() {
return SnapshotMode.NEVER == this.snapshotMode;
protected Snapshotter getSnapshotter() {
return this.snapshotMode.getSnapshotter(config);
}
protected boolean alwaysTakeSnapshot() {
return SnapshotMode.ALWAYS == this.snapshotMode;
}
protected boolean initialOnlySnapshot() {
return SnapshotMode.INITIAL_ONLY == this.snapshotMode;
}
protected String snapshotSelectOverrides() {
public String snapshotSelectOverrides() {
return config.getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE);
}
protected String snapshotSelectOverrideForTable(String table) {
public String snapshotSelectOverrideForTable(String table) {
return config.getString(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + table);
}
@ -914,11 +961,15 @@ protected boolean skipRefreshSchemaOnMissingToastableData() {
return SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST == this.schemaRefreshMode;
}
protected Duration xminFetchInterval() {
return Duration.ofMillis(config.getLong(PostgresConnectorConfig.XMIN_FETCH_INTERVAL));
}
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
USER, PASSWORD, ON_CONNECT_STATEMENTS, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
DROP_SLOT_ON_STOP, STREAM_PARAMS, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
DROP_SLOT_ON_STOP, STREAM_PARAMS, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE, XMIN_FETCH_INTERVAL, SNAPSHOT_MODE_CLASS);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
CommonConnectorConfig.TOMBSTONES_ON_DELETE, Heartbeat.HEARTBEAT_INTERVAL,

View File

@ -13,6 +13,10 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
@ -23,8 +27,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.LoggingContext;
/**
@ -70,6 +72,12 @@ public void start(Configuration config) {
databaseCharset = connection.getDatabaseCharset();
}
Snapshotter snapshotter = connectorConfig.getSnapshotter();
if (snapshotter == null) {
logger.error("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
}
// create the task context and schema...
TopicSelector<TableId> topicSelector = PostgresTopicSelector.create(connectorConfig);
PostgresSchema schema = new PostgresSchema(connectorConfig, typeRegistry, databaseCharset, topicSelector);
@ -80,41 +88,27 @@ public void start(Configuration config) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
//Print out the server information
SlotState slotInfo = null;
try (PostgresConnection connection = taskContext.createConnection()) {
logger.info(connection.serverInfo().toString());
slotInfo = connection.getReplicationSlotInfo(connectorConfig.slotName(), connectorConfig.plugin().getPostgresPluginName());
}
catch (SQLException e) {
logger.warn("unable to load info of replication slot, debezium will try to create the slot");
}
if (existingOffset == null) {
logger.info("No previous offset found");
if (connectorConfig.snapshotNeverAllowed()) {
logger.info("Snapshots are not allowed as per configuration, starting streaming logical changes only");
producer = new RecordsStreamProducer(taskContext, sourceInfo);
} else {
// otherwise we always want to take a snapshot at startup
createSnapshotProducer(taskContext, sourceInfo, connectorConfig.initialOnlySnapshot());
}
} else {
sourceInfo.load(existingOffset);
logger.info("Found previous offset {}", sourceInfo);
if (sourceInfo.isSnapshotInEffect()) {
if (connectorConfig.snapshotNeverAllowed()) {
// No snapshots are allowed
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
throw new ConnectException(msg);
} else {
logger.info("Found previous incomplete snapshot");
createSnapshotProducer(taskContext, sourceInfo, connectorConfig.initialOnlySnapshot());
}
} else if (connectorConfig.alwaysTakeSnapshot()) {
logger.info("Taking a new snapshot as per configuration");
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, true);
} else {
logger.info(
"Previous snapshot has completed successfully, streaming logical changes from last known position");
producer = new RecordsStreamProducer(taskContext, sourceInfo);
}
// if we have no initial offset, indicate that to Snapshotter by passing null
snapshotter.init(connectorConfig, null, slotInfo);
}
else {
logger.info("Found previous offset {}", sourceInfo);
sourceInfo.load(existingOffset);
snapshotter.init(connectorConfig, sourceInfo.asOffsetState(), slotInfo);
}
createRecordProducer(taskContext, sourceInfo, snapshotter);
changeEventQueue = new ChangeEventQueue.Builder<ChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
@ -125,20 +119,29 @@ public void start(Configuration config) {
producer.start(changeEventQueue::enqueue, changeEventQueue::producerFailure);
running.compareAndSet(false, true);
} catch (SQLException e) {
throw new ConnectException(e);
} finally {
}
finally {
previousContext.restore();
}
}
private void createSnapshotProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, boolean initialOnlySnapshot) {
if (initialOnlySnapshot) {
logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, false);
} else {
logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, true);
private void createRecordProducer(PostgresTaskContext taskContext, SourceInfo sourceInfo, Snapshotter snapshotter) {
if (snapshotter.shouldSnapshot()) {
if (snapshotter.shouldStream()) {
logger.info("Taking a new snapshot of the DB and streaming logical changes once the snapshot is finished...");
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, snapshotter);
}
else {
logger.info("Taking only a snapshot of the DB without streaming any changes afterwards...");
producer = new RecordsSnapshotProducer(taskContext, sourceInfo, snapshotter);
}
}
else if (snapshotter.shouldStream()) {
logger.info("Not attempting to take a snapshot, immediately starting to stream logical changes...");
producer = new RecordsStreamProducer(taskContext, sourceInfo);
}
else {
throw new ConnectException("Snapshotter neither is snapshotting or streaming, invalid!");
}
}

View File

@ -13,8 +13,13 @@
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The context of a {@link PostgresConnectorTask}. This deals with most of the brunt of reading various configuration options
@ -25,14 +30,22 @@
@ThreadSafe
public class PostgresTaskContext extends CdcSourceTaskContext {
protected final static Logger LOGGER = LoggerFactory.getLogger(PostgresTaskContext.class);
private final PostgresConnectorConfig config;
private final TopicSelector<TableId> topicSelector;
private final PostgresSchema schema;
private ElapsedTimeStrategy refreshXmin;
private Long lastXmin;
protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicSelector<TableId> topicSelector) {
super("Postgres", config.getLogicalName(), Collections::emptySet);
this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis());
}
this.topicSelector = topicSelector;
assert schema != null;
this.schema = schema;
@ -56,6 +69,37 @@ protected void refreshSchema(boolean printReplicaIdentityInfo) throws SQLExcepti
}
}
protected Long getSlotXmin() throws SQLException {
// when xmin fetch is set to 0, we don't track it to ignore any performance of querying the
// slot periodically
if (config.xminFetchInterval().toMillis() <= 0) {
return null;
}
assert(this.refreshXmin != null);
if (this.refreshXmin.hasElapsed()) {
lastXmin = getCurrentSlotInfo().slotCatalogXmin();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Fetched new xmin from slot of {}", lastXmin);
}
}
else {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("reusing xmin value of {}", lastXmin);
}
}
return lastXmin;
}
protected SlotState getCurrentSlotInfo() throws SQLException {
SlotState slotInfo;
try (final PostgresConnection connection = createConnection()) {
slotInfo = connection.getReplicationSlotInfo(config.slotName(), config.plugin().getPostgresPluginName());
}
return slotInfo;
}
protected ReplicationConnection createReplicationConnection() throws SQLException {
return ReplicationConnection.builder(config.jdbcConfig())
.withSlot(config.slotName())

View File

@ -14,8 +14,6 @@
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@ -24,6 +22,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import io.debezium.connector.postgresql.spi.Snapshotter;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
@ -62,14 +61,16 @@ public class RecordsSnapshotProducer extends RecordsProducer {
private final Optional<RecordsStreamProducer> streamProducer;
private final AtomicReference<SourceRecord> currentRecord;
private final Snapshotter snapshotter;
public RecordsSnapshotProducer(PostgresTaskContext taskContext,
SourceInfo sourceInfo,
boolean continueStreamingAfterCompletion) {
Snapshotter snapshotter) {
super(taskContext, sourceInfo);
executorService = Threads.newSingleThreadExecutor(PostgresConnector.class, taskContext.config().getLogicalName(), CONTEXT_NAME);
currentRecord = new AtomicReference<>();
if (continueStreamingAfterCompletion) {
this.snapshotter = snapshotter;
if (snapshotter.shouldStream()) {
// we need to create the stream producer here to make sure it creates the replication connection;
// otherwise we can't stream back changes happening while the snapshot is taking place
streamProducer = Optional.of(new RecordsStreamProducer(taskContext, sourceInfo));
@ -207,26 +208,30 @@ private void takeSnapshot(BlockingConsumer<ChangeEvent> consumer) {
// and mark the start of the snapshot
sourceInfo.startSnapshot();
sourceInfo.update(xlogStart, clock().currentTime(), txId, null);
// use the old xmin, as we don't want to update it if in xmin recovery
sourceInfo.update(xlogStart, clock().currentTime(), txId, null, sourceInfo.xmin());
logger.info("Step 3: reading and exporting the contents of each table");
AtomicInteger rowsCounter = new AtomicInteger(0);
final Map<TableId, String> selectOverrides = getSnapshotSelectOverridesByTable();
for(TableId tableId : schema.tableIds()) {
long exportStart = clock().currentTimeInMillis();
logger.info("\t exporting data from table '{}'", tableId);
try {
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
final String selectStatement = selectOverrides.getOrDefault(tableId, "SELECT * FROM " + tableId.toDoubleQuotedString());
logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
final Optional<String> selectStatement = snapshotter.buildSnapshotQuery(tableId);
if (!selectStatement.isPresent()) {
logger.warn("For table '{}' the select statement was not provided, skipping table", tableId);
}
else {
logger.info("For table '{}' using select statement: '{}'", tableId, selectStatement);
connection.queryWithBlockingConsumer(selectStatement,
this::readTableStatement,
rs -> readTable(tableId, rs, consumer, rowsCounter));
logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", rowsCounter.get(),
tableId, Strings.duration(clock().currentTimeInMillis() - exportStart));
rowsCounter.set(0);
connection.queryWithBlockingConsumer(selectStatement.get(),
this::readTableStatement,
rs -> readTable(tableId, rs, consumer, rowsCounter));
logger.info("\t finished exporting '{}' records for '{}'; total duration '{}'", rowsCounter.get(),
tableId, Strings.duration(clock().currentTimeInMillis() - exportStart));
rowsCounter.set(0);
}
} catch (SQLException e) {
throw new ConnectException(e);
}
@ -413,26 +418,4 @@ private void sendCurrentRecord(BlockingConsumer<ChangeEvent> consumer) throws In
//send the last generated record
consumer.accept(new ChangeEvent(record, sourceInfo.lsn()));
}
/**
* Returns any SELECT overrides, if present.
*/
private Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = taskContext.getConfig().snapshotSelectOverrides();
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
taskContext.getConfig().snapshotSelectOverrideForTable(table)
);
}
return snapshotSelectOverridesByTable;
}
}

View File

@ -155,11 +155,11 @@ protected synchronized void commit(long lsn) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
ReplicationStream replicationStream = this.replicationStream.get();
if (replicationStream != null) {
if (logger.isDebugEnabled()) {
logger.debug("Flushing LSN to server: {}", LogSequenceNumber.valueOf(lsn));
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
}
@ -249,7 +249,7 @@ private void process(ReplicationMessage message, Long lsn, BlockingConsumer<Chan
// update the source info with the coordinates for this message
Instant commitTime = message.getCommitTime();
long txId = message.getTransactionId();
sourceInfo.update(lsn, commitTime, txId, tableId);
sourceInfo.update(lsn, commitTime, txId, tableId, taskContext.getSlotXmin());
if (logger.isDebugEnabled()) {
logger.debug("received new message at position {}\n{}", ReplicationConnection.format(lsn), message);
}

View File

@ -18,6 +18,7 @@
import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.relational.TableId;
import io.debezium.time.Conversions;
@ -76,13 +77,14 @@
* @author Horia Chiorean
*/
@NotThreadSafe
final class SourceInfo extends AbstractSourceInfo {
public final class SourceInfo extends AbstractSourceInfo {
public static final String SERVER_NAME_KEY = "name";
public static final String SERVER_PARTITION_KEY = "server";
public static final String DB_NAME_KEY = "db";
public static final String TIMESTAMP_KEY = "ts_usec";
public static final String TXID_KEY = "txId";
public static final String XMIN_KEY = "xmin";
public static final String LSN_KEY = "lsn";
public static final String SCHEMA_NAME_KEY = "schema";
public static final String TABLE_NAME_KEY = "table";
@ -103,6 +105,7 @@ final class SourceInfo extends AbstractSourceInfo {
.field(TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(SNAPSHOT_KEY, SchemaBuilder.bool().optional().defaultValue(false).build())
.field(LAST_SNAPSHOT_RECORD_KEY, Schema.OPTIONAL_BOOLEAN_SCHEMA)
.field(XMIN_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.build();
private final String serverName;
@ -111,6 +114,7 @@ final class SourceInfo extends AbstractSourceInfo {
private Long lsn;
private Long txId;
private Long xmin;
private Long useconds;
private boolean snapshot = false;
private Boolean lastSnapshotRecord;
@ -127,6 +131,7 @@ protected SourceInfo(String serverName, String dbName) {
protected void load(Map<String, Object> lastStoredOffset) {
this.lsn = ((Number) lastStoredOffset.get(LSN_KEY)).longValue();
this.txId = ((Number) lastStoredOffset.get(TXID_KEY)).longValue();
this.xmin = (Long) lastStoredOffset.get(XMIN_KEY);
this.useconds = (Long) lastStoredOffset.get(TIMESTAMP_KEY);
this.snapshot = lastStoredOffset.containsKey(SNAPSHOT_KEY);
if (this.snapshot) {
@ -163,6 +168,9 @@ public Map<String, String> partition() {
if (lsn != null) {
result.put(LSN_KEY, lsn);
}
if (xmin != null) {
result.put(XMIN_KEY, xmin);
}
if (snapshot) {
result.put(SNAPSHOT_KEY, true);
result.put(LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
@ -170,21 +178,27 @@ public Map<String, String> partition() {
return result;
}
public OffsetState asOffsetState() {
return new OffsetState(lsn, txId, xmin, Conversions.toInstantFromMicros(useconds), isSnapshotInEffect());
}
/**
* Updates the source with information about a particular received or read event.
*
* @param lsn the position in the server WAL for a particular event; may be null indicating that this information is not
* available
* @param useconds the commit time (in microseconds since epoch) of the transaction that generated the event;
* @param commitTime the commit time (in microseconds since epoch) of the transaction that generated the event;
* may be null indicating that this information is not available
* @param txId the ID of the transaction that generated the transaction; may be null if this information nis not available
* @param txId the ID of the transaction that generated the transaction; may be null if this information is not available
* @param tableId the table that should be included in the source info; may be null
* @param xmin the xmin of the slot, may be null
* @return this instance
*/
protected SourceInfo update(Long lsn, Instant commitTime, Long txId, TableId tableId) {
protected SourceInfo update(Long lsn, Instant commitTime, Long txId, TableId tableId, Long xmin) {
this.lsn = lsn;
this.useconds = Conversions.toEpochMicros(commitTime);
this.txId = txId;
this.xmin = xmin;
if (tableId != null && tableId.schema() != null) {
this.schemaName = tableId.schema();
}
@ -230,7 +244,7 @@ protected String connector() {
* Get a {@link Struct} representation of the source {@link #partition()} and {@link #offset()} information. The Struct
* complies with the {@link #SCHEMA} for the Postgres connector.
* <p>
* This method should always be called after {@link #update(Long, Long, Long, TableId)}.
* This method should always be called after {@link #update(Long, Instant, Long, TableId, Long)}.
*
* @return the source partition and offset {@link Struct}; never null
* @see #schema()
@ -255,7 +269,7 @@ protected Struct source() {
*
* @return {@code true} if a snapshot is in effect, or {@code false} otherwise
*/
protected boolean isSnapshotInEffect() {
public boolean isSnapshotInEffect() {
return snapshot && (this.lastSnapshotRecord == null || !this.lastSnapshotRecord);
}
@ -274,11 +288,15 @@ protected void completeSnapshot() {
this.snapshot = false;
}
protected Long lsn() {
public Long lsn() {
return this.lsn;
}
protected boolean hasLastKnownPosition() {
public Long xmin() {
return this.xmin;
}
public boolean hasLastKnownPosition() {
return this.lsn != null;
}
@ -293,6 +311,9 @@ public String toString() {
if (txId != null) {
sb.append(", txId=").append(txId);
}
if (xmin != null) {
sb.append(", xmin=").append(xmin);
}
if (useconds != null) {
sb.append(", useconds=").append(useconds);
}

View File

@ -19,6 +19,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import io.debezium.connector.postgresql.spi.SlotState;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.core.BaseConnection;
import org.postgresql.core.TypeInfo;
@ -120,35 +121,84 @@ public ServerInfo.ReplicaIdentity readReplicaIdentityInfo(TableId tableId) throw
}, rs -> {
if (rs.next()) {
replIdentity.append(rs.getString(1));
} else {
}
else {
LOGGER.warn("Cannot determine REPLICA IDENTITY information for table '{}'", tableId);
}
});
return ServerInfo.ReplicaIdentity.parseFromDB(replIdentity.toString());
}
/**
* Returns the current state of the replication slot
* @param slotName the name of the slot
* @param pluginName the name of the plugin used for the desired slot
* @return the {@link SlotState} or null, if no slot state is found
* @throws SQLException
*/
public SlotState getReplicationSlotInfo(String slotName, String pluginName) throws SQLException {
ServerInfo.ReplicationSlot slot = fetchReplicationSlotInfo(slotName, pluginName);
if (slot.equals(ServerInfo.ReplicationSlot.INVALID)) {
return null;
}
else {
return slot.asSlotState();
}
}
/**
* Fetches the state of a replication stage given a slot name and plugin name
* @param slotName the name of the slot
* @param pluginName the name of the plugin used for the desired slot
* @return the {@link ServerInfo.ReplicationSlot} object or a {@link ServerInfo.ReplicationSlot#INVALID} if
* the slot is not valid
* @throws SQLException is thrown by the underlying JDBC
*/
protected ServerInfo.ReplicationSlot fetchReplicationSlotInfo(String slotName, String pluginName) throws SQLException {
final String database = database();
final ServerInfo.ReplicationSlot slot = queryForSlot(slotName, database, pluginName,
rs -> {
if (rs.next()) {
boolean active = rs.getBoolean("active");
Long confirmedFlushedLsn = parseConfirmedFlushLsn(slotName, pluginName, database, rs);
if (confirmedFlushedLsn == null) {
return null;
}
Long restartLsn = parseRestartLsn(slotName, pluginName, database, rs);
if (restartLsn == null) {
return null;
}
Long xmin = rs.getLong("catalog_xmin");
return new ServerInfo.ReplicationSlot(active, confirmedFlushedLsn, restartLsn, xmin);
}
else {
LOGGER.debug("No replication slot '{}' is present for plugin '{}' and database '{}'", slotName,
pluginName, database);
return ServerInfo.ReplicationSlot.INVALID;
}
}
);
return slot;
}
/**
* Fetches a replication slot, repeating the query until either the slot is created or until
* the max number of attempts has been reached
*
* To fetch the slot without teh retries, use the {@link PostgresConnection#fetchReplicationSlotInfo} call
* @param slotName the slot name
* @param pluginName the name of the plugin
* @return the {@link ServerInfo.ReplicationSlot} object or a {@link ServerInfo.ReplicationSlot#INVALID} if
* the slot is not valid
* @throws SQLException is thrown by the underyling jdbc driver
* @throws InterruptedException is thrown if we don't return an answer within the set number of retries
*/
protected ServerInfo.ReplicationSlot readReplicationSlotInfo(String slotName, String pluginName) throws SQLException, InterruptedException {
final String database = database();
final Metronome metronome = Metronome.parker(PAUSE_BETWEEN_REPLICATION_SLOT_RETRIEVAL_ATTEMPTS, Clock.SYSTEM);
for (int attempt = 1; attempt <= MAX_ATTEMPTS_FOR_OBTAINING_REPLICATION_SLOT; attempt++) {
final ServerInfo.ReplicationSlot slot = queryForSlot(slotName, database, pluginName,
rs -> {
if (rs.next()) {
boolean active = rs.getBoolean("active");
Long confirmedFlushedLSN = parseConfirmedFlushLsn(slotName, pluginName, database, rs);
if (confirmedFlushedLSN == null) {
return null;
}
return new ServerInfo.ReplicationSlot(active, confirmedFlushedLSN);
}
else {
LOGGER.debug("No replication slot '{}' is present for plugin '{}' and database '{}'", slotName,
pluginName, database);
return ServerInfo.ReplicationSlot.INVALID;
}
}
);
final ServerInfo.ReplicationSlot slot = fetchReplicationSlotInfo(slotName, pluginName);
if (slot != null) {
LOGGER.info("Obtained valid replication slot {}", slot);
return slot;
@ -193,6 +243,18 @@ private Long parseConfirmedFlushLsn(String slotName, String pluginName, String d
return confirmedFlushedLsn;
}
private Long parseRestartLsn(String slotName, String pluginName, String database, ResultSet rs) {
Long restartLsn = null;
try {
restartLsn = tryParseLsn(slotName, pluginName, database, rs, "restart_lsn");
}
catch (SQLException e) {
throw new ConnectException("restart_lsn could be found");
}
return restartLsn;
}
private Long tryParseLsn(String slotName, String pluginName, String database, ResultSet rs, String column) throws ConnectException, SQLException {
Long lsn = null;

View File

@ -153,14 +153,14 @@ else if (slotInfo.active()) {
}
});
if (shouldCreateSlot || !slotInfo.hasValidFlushedLSN()) {
if (shouldCreateSlot || !slotInfo.hasValidFlushedLsn()) {
// this is a new slot or we weren't able to read a valid flush LSN pos, so we always start from the xlog pos that was reported
this.defaultStartingPos = xlogStart.get();
} else {
Long latestFlushedLSN = slotInfo.latestFlushedLSN();
this.defaultStartingPos = latestFlushedLSN < xlogStart.get() ? latestFlushedLSN : xlogStart.get();
Long latestFlushedLsn = slotInfo.latestFlushedLsn();
this.defaultStartingPos = latestFlushedLsn < xlogStart.get() ? latestFlushedLsn : xlogStart.get();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("found previous flushed LSN '{}'", ReplicationConnection.format(latestFlushedLSN));
LOGGER.debug("found previous flushed LSN '{}'", ReplicationConnection.format(latestFlushedLsn));
}
}
} catch (SQLException e) {
@ -242,7 +242,7 @@ else if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already
private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
// make sure this is volatile since multiple threads may be interested in this value
private volatile LogSequenceNumber lastReceivedLSN;
private volatile LogSequenceNumber lastReceivedLsn;
@Override
public void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
@ -265,7 +265,7 @@ public void readPending(ReplicationMessageProcessor processor) throws SQLExcepti
}
private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
lastReceivedLSN = stream.getLastReceiveLSN();
lastReceivedLsn = stream.getLastReceiveLSN();
messageDecoder.processMessage(buffer, processor, typeRegistry);
}
@ -277,12 +277,12 @@ public void close() throws SQLException {
@Override
public void flushLastReceivedLsn() throws SQLException {
if (lastReceivedLSN == null) {
if (lastReceivedLsn == null) {
// nothing to flush yet, since we haven't read anything...
return;
}
doFlushLsn(lastReceivedLSN);
doFlushLsn(lastReceivedLsn);
}
@Override
@ -299,7 +299,7 @@ private void doFlushLsn(LogSequenceNumber lsn) throws SQLException {
@Override
public Long lastReceivedLsn() {
return lastReceivedLSN != null ? lastReceivedLSN.asLong() : null;
return lastReceivedLsn != null ? lastReceivedLsn.asLong() : null;
}
private void processWarnings(final boolean forced) throws SQLException {

View File

@ -6,6 +6,8 @@
package io.debezium.connector.postgresql.connection;
import io.debezium.connector.postgresql.spi.SlotState;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@ -139,31 +141,63 @@ protected static ReplicaIdentity parseFromDB(String s) {
* Information about a server replication slot
*/
protected static class ReplicationSlot {
protected static final ReplicationSlot INVALID = new ReplicationSlot(false, null);
protected static final ReplicationSlot INVALID = new ReplicationSlot(false, null, null, null);
private boolean active;
private Long latestFlushedLSN;
protected ReplicationSlot(boolean active, Long latestFlushedLSN) {
private Long latestFlushedLsn;
private Long restartLsn;
private Long catalogXmin;
protected ReplicationSlot(boolean active, Long latestFlushedLsn, Long restartLsn, Long catalogXmin) {
this.active = active;
this.latestFlushedLSN = latestFlushedLSN;
this.latestFlushedLsn = latestFlushedLsn;
this.restartLsn = restartLsn;
this.catalogXmin = catalogXmin;
}
protected boolean active() {
return active;
}
protected Long latestFlushedLSN() {
return latestFlushedLSN;
/**
* Represents the `confirmed_flushed_lsn` field of the replication slot.
*
* This value represents the latest LSN that the logical replication
* consumer has reported back to postgres.
* @return the latestFlushedLsn
*/
protected Long latestFlushedLsn() {
return latestFlushedLsn;
}
/**
* Represents the `restart_lsn` field of the replication slot.
*
* The restart_lsn will be the LSN the slot restarts from
* in the event of the disconnect. This can be distinct from
* the `confirmed_flushed_lsn` as the two pointers are moved
* independently
* @return the restartLsn
*/
protected Long restartLsn() {
return restartLsn;
}
protected Long catalogXmin() {
return catalogXmin;
}
protected boolean hasValidFlushedLSN() {
return latestFlushedLSN != null;
protected boolean hasValidFlushedLsn() {
return latestFlushedLsn != null;
}
protected SlotState asSlotState() {
return new SlotState(latestFlushedLsn, restartLsn, catalogXmin, active);
}
@Override
public String toString() {
return "ReplicationSlot [active=" + active + ", latestFlushedLSN=" + latestFlushedLSN + "]";
return "ReplicationSlot [active=" + active + ", latestFlushedLsn=" + latestFlushedLsn + ", catalogXmin=" + catalogXmin + "]";
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlwaysSnapshotter extends QueryingSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
LOGGER.info("Taking a new snapshot as per configuration");
return true;
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
public class InitialOnlySnapshotter extends QueryingSnapshotter {
@Override
public boolean shouldStream() {
return false;
}
@Override
public boolean shouldSnapshot() {
return true;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class InitialSnapshotter extends QueryingSnapshotter {
private OffsetState sourceInfo;
private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class);
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
super.init(config, sourceInfo, slotState);
this.sourceInfo = sourceInfo;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
LOGGER.info("Found previous incomplete snapshot");
return true;
} else {
LOGGER.info(
"Previous snapshot has completed successfully, streaming logical changes from last known position");
return false;
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
public class NeverSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(NeverSnapshotter.class);
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
if (sourceInfo != null && sourceInfo.snapshotInEffect()) {
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
LOGGER.error(msg);
throw new ConnectException(msg);
}
else {
LOGGER.info("Snapshots are not allowed as per configuration, starting streaming logical changes only");
}
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
return false;
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
throw new UnsupportedOperationException("'never' snapshot mode cannot build queries");
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
public abstract class QueryingSnapshotter implements Snapshotter {
private PostgresConnectorConfig config;
private Map<TableId, String> snapshotOverrides;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
this.config = config;
this.snapshotOverrides = getSnapshotSelectOverridesByTable();
}
public Optional<String> buildSnapshotQuery(TableId tableId) {
if (snapshotOverrides.containsKey(tableId)) {
return Optional.of(snapshotOverrides.get(tableId));
}
else {
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
StringBuilder q = new StringBuilder();
q.append("SELECT * FROM ");
q.append(tableId.toDoubleQuotedString());
return Optional.of(q.toString());
}
}
/**
* Returns any SELECT overrides, if present.
*/
private Map<TableId, String> getSnapshotSelectOverridesByTable() {
String tableList = config.snapshotSelectOverrides();
if (tableList == null) {
return Collections.emptyMap();
}
Map<TableId, String> snapshotSelectOverridesByTable = new HashMap<>();
for (String table : tableList.split(",")) {
snapshotSelectOverridesByTable.put(
TableId.parse(table),
config.snapshotSelectOverrideForTable(table)
);
}
return snapshotSelectOverridesByTable;
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.spi;
import io.debezium.annotation.Incubating;
import java.time.Instant;
/**
* A simple data container that represents the last seen offset
* which was written by debezium.
*
* This data may differ based on decoding plugin and settings, such as
* lastSeenXmin being null if xmin tracking isn't enabled
*/
@Incubating
public class OffsetState {
private final Long lsn;
private final Long txId;
private final Long xmin;
private final Instant commitTs;
private final boolean snapshotting;
public OffsetState(Long lsn, Long txId, Long xmin, Instant lastCommitTs, boolean isSnapshot) {
this.lsn = lsn;
this.txId = txId;
this.xmin = xmin;
this.commitTs = lastCommitTs;
this.snapshotting = isSnapshot;
}
/**
* @return the last LSN seen by debezium
*/
public Long lastSeenLsn() {
return lsn;
}
/**
* @return the last txid seen by debezium
*/
public Long lastSeenTxId() {
return txId;
}
/**
* @return the last xmin seen by debezium
*/
public Long lastSeenXmin() {
return xmin;
}
/**
* @return the last commit timestamp seen by debezium
*/
public Instant lastCommitTs() {
return commitTs;
}
/**
* @return indicates if a snapshot is happening
*/
public boolean snapshotInEffect() {
return snapshotting;
}
}

View File

@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.spi;
import io.debezium.annotation.Incubating;
/**
* A simple data container that holds the state of the current slot
*/
@Incubating
public class SlotState {
private final Long latestFlushedLsn;
private final Long restartLsn;
private final Long catalogXmin;
private final boolean active;
public SlotState(Long lastFlushLsn, Long restartLsn, Long catXmin, boolean active) {
this.active = active;
this.latestFlushedLsn = lastFlushLsn;
this.restartLsn = restartLsn;
this.catalogXmin = catXmin;
}
/**
* @return the slot's `confirmed_flushed_lsn` value
*/
public Long slotLastFlushedLsn() {
return latestFlushedLsn;
}
/**
* @return the slot's `restart_lsn` value
*/
public Long slotRestartLsn() {
return restartLsn;
}
/**
* @return the slot's `catalog_xmin` value
*/
public Long slotCatalogXmin() {
return catalogXmin;
}
/**
* @return if the slot is active
*/
public boolean slotIsActive() {
return active;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.spi;
import io.debezium.annotation.Incubating;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.relational.TableId;
import java.util.Optional;
/**
* This interface is used to determine details about the snapshot process:
*
* Namely:
* - Should a snapshot occur at all
* - Should streaming occur
* - What queries should be used to snapshot
*
* While many default snapshot modes are provided with debezium (see documentation for details)
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
* Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()}
* or true for both.
*/
@Incubating
public interface Snapshotter {
void init(PostgresConnectorConfig config, OffsetState sourceInfo,
SlotState slotState);
/**
* @return true if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
/**
* @return true if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* Generate a valid postgres query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> buildSnapshotQuery(TableId tableId);
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.relational.TableId;
import java.util.Optional;
/**
* This is a small class used in PostgresConnectorIT to test a custom snapshot
*
* It is tightly coupled to the test there, but needs to be placed here in order
* to allow for class loading to work
*/
public class CustomTestSnapshot implements Snapshotter {
private boolean hasState;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
hasState = (sourceInfo != null);
}
@Override
public boolean shouldSnapshot() {
return true;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId) {
// on an empty state, don't read from s2 schema, but afterwards, do
if (!hasState && tableId.schema().equals("s2")) {
return Optional.empty();
}
else {
return Optional.of("select * from " + tableId.toDoubleQuotedString());
}
}
}

View File

@ -6,10 +6,7 @@
package io.debezium.connector.postgresql;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.ALWAYS;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode.NEVER;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import static io.debezium.connector.postgresql.TestHelper.PK_FIELD;
import static io.debezium.connector.postgresql.TestHelper.topicName;
import static junit.framework.TestCase.assertEquals;
@ -151,7 +148,7 @@ public void shouldValidateConfiguration() throws Exception {
validateField(validatedConfig, PostgresConnectorConfig.TABLE_WHITELIST, null);
validateField(validatedConfig, PostgresConnectorConfig.TABLE_BLACKLIST, null);
validateField(validatedConfig, PostgresConnectorConfig.COLUMN_BLACKLIST, null);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL);
validateField(validatedConfig, PostgresConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, PostgresConnectorConfig.DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS);
validateField(validatedConfig, PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE);
validateField(validatedConfig, PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.PRECISE);
@ -188,7 +185,7 @@ public void shouldSupportSSLParameters() throws Exception {
public void shouldProduceEventsWithInitialSnapshot() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
@ -253,7 +250,7 @@ public void shouldConsumeMessagesFromSnapshot() throws Exception {
TestHelper.execute(INSERT_STMT);
}
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.MAX_QUEUE_SIZE, recordCount / 2)
.with(PostgresConnectorConfig.MAX_BATCH_SIZE, 10)
.with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1");
@ -358,7 +355,7 @@ public void shouldReceiveChangesForChangePKColumnDefinition() throws Exception {
public void shouldIgnoreEventsForDeletedTable() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
@ -393,7 +390,7 @@ public void shouldIgnoreViews() throws Exception {
"CREATE VIEW s1.myview AS SELECT * from s1.a;"
);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
@ -425,7 +422,7 @@ public void shouldIgnoreViews() throws Exception {
public void shouldExecuteOnConnectStatements() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.ON_CONNECT_STATEMENTS, "INSERT INTO s1.a (aa) VALUES (2); INSERT INTO s2.a (aa, bb) VALUES (2, 'hello;; world');")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
@ -445,7 +442,7 @@ public void shouldExecuteOnConnectStatements() throws Exception {
public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, NEVER.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
@ -463,7 +460,7 @@ public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws Interrupted
public void shouldNotProduceEventsWithInitialOnlySnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
@ -483,7 +480,7 @@ public void shouldNotProduceEventsWithInitialOnlySnapshot() throws InterruptedEx
public void shouldProduceEventsWhenAlwaysTakingSnapshots() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, ALWAYS.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
@ -512,7 +509,7 @@ public void shouldResumeSnapshotIfFailingMidstream() throws Exception {
String setupStmt = SETUP_TABLES_STMT + INSERT_STMT;
TestHelper.execute(setupStmt);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
EmbeddedEngine.CompletionCallback completionCallback = (success, message, error) -> {
if (error != null) {
@ -558,7 +555,7 @@ public void shouldTakeBlacklistFiltersIntoAccount() throws Exception {
"INSERT INTO s2.a (aa) VALUES (5);";
TestHelper.execute(setupStmt);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SCHEMA_BLACKLIST, "s2")
.with(PostgresConnectorConfig.TABLE_BLACKLIST, ".+b")
@ -597,7 +594,7 @@ public void shouldReplaceInvalidTopicNameCharacters() throws Exception {
TestHelper.execute(setupStmt);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1")
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1\\.dbz_878_some\\|test@data");
@ -623,7 +620,7 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
final int recordCount = 10;
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, NEVER.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.with(PostgresConnectorConfig.TABLE_WHITELIST, "s1.a")
.build();
@ -652,6 +649,60 @@ public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException
Assertions.assertThat(flushLsn.size()).isGreaterThanOrEqualTo((recordCount * 3) / 4);
}
@Test
@FixFor("DBZ-1082")
public void shouldAllowForCustomSnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
SourceRecords actualRecords = consumeRecordsByTopic(1);
List<SourceRecord> s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
List<SourceRecord> s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs).isNull();
SourceRecord record = s1recs.get(0);
VerifyRecord.isValidRead(record, PK_FIELD, 1);
TestHelper.execute(INSERT_STMT);
actualRecords = consumeRecordsByTopic(2);
s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(1);
assertThat(s2recs.size()).isEqualTo(1);
record = s1recs.get(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
record = s2recs.get(0);
VerifyRecord.isValidInsert(record, PK_FIELD, 2);
stopConnector();
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
actualRecords = consumeRecordsByTopic(4);
s1recs = actualRecords.recordsForTopic(topicName("s1.a"));
s2recs = actualRecords.recordsForTopic(topicName("s2.a"));
assertThat(s1recs.size()).isEqualTo(2);
assertThat(s2recs.size()).isEqualTo(2);
VerifyRecord.isValidRead(s1recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s1recs.get(1), PK_FIELD, 2);
VerifyRecord.isValidRead(s2recs.get(0), PK_FIELD, 1);
VerifyRecord.isValidRead(s2recs.get(1), PK_FIELD, 2);
}
private String getConfirmedFlushLsn(PostgresConnection connection) throws SQLException {
return connection.prepareQueryAndMap(
"select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", statement -> {
@ -687,7 +738,7 @@ public void testStreamingPerformance() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, NEVER.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();
@ -721,7 +772,7 @@ public void testSnapshotPerformance() throws Exception {
TestHelper.dropAllSchemas();
TestHelper.executeDDL("postgres_create_tables.ddl");
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY.getValue())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
final long recordsCount = 1000000;
final int batchSize = 1000;

View File

@ -21,6 +21,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
@ -80,7 +83,9 @@ public void after() throws Exception {
@Test
public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.build());
snapshotProducer = buildNoStreamProducer(context, config);
TestConsumer consumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
@ -116,7 +121,7 @@ public void shouldGenerateSnapshotsForCustomDatatypes() throws Exception {
TestHelper.getSchema(config),
PostgresTopicSelector.create(config)
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildNoStreamProducer(context, config);
final TestConsumer consumer = testConsumer(1, "public");
@ -154,7 +159,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
insertStmt;
TestHelper.execute(statements);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), true);
snapshotProducer = buildWithStreamProducer(context, config);
TestConsumer consumer = testConsumer(2, "s1", "s2");
snapshotProducer.start(consumer, e -> {});
@ -186,7 +191,7 @@ public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
// start a new producer back up, take a new snapshot (we expect all the records to be read back)
int expectedRecordsCount = 6;
consumer = testConsumer(expectedRecordsCount, "s1", "s2");
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), true);
snapshotProducer = buildWithStreamProducer(context, config);
snapshotProducer.start(consumer, e -> {});
consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
@ -237,7 +242,7 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), true);
snapshotProducer = buildWithStreamProducer(context, config);
TestConsumer consumer = testConsumer(2);
snapshotProducer.start(consumer, e -> {});
@ -247,7 +252,6 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
final SourceRecord first = consumer.remove();
VerifyRecord.isValidRead(first, PK_FIELD, 1);
assertRecordOffsetAndSnapshotSource(first, true, true);
System.out.println(first);
final SourceRecord second = consumer.remove();
assertThat(second.topic()).startsWith("__debezium-heartbeat");
assertRecordOffsetAndSnapshotSource(second, false, false);
@ -256,6 +260,18 @@ public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
snapshotProducer.stop();
}
private RecordsSnapshotProducer buildNoStreamProducer(PostgresTaskContext ctx, PostgresConnectorConfig config) {
Snapshotter sn = new InitialOnlySnapshotter();
sn.init(config, null, null);
return new RecordsSnapshotProducer(ctx, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), sn);
}
private RecordsSnapshotProducer buildWithStreamProducer(PostgresTaskContext ctx, PostgresConnectorConfig config) {
Snapshotter sn = new AlwaysSnapshotter();
sn.init(config, null, null);
return new RecordsSnapshotProducer(ctx, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), sn);
}
private void assertReadRecord(SourceRecord record, Map<String, List<SchemaAndValueField>> expectedValuesByTopicName) {
VerifyRecord.isValidRead(record, PK_FIELD, 1);
String topicName = record.topic().replace(TestHelper.TEST_SERVER + ".", "");
@ -279,7 +295,7 @@ public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() thr
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildNoStreamProducer(context, config);
TestConsumer consumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
@ -321,7 +337,7 @@ public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() thro
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildNoStreamProducer(context, config);
TestConsumer consumer = testConsumer(1, "public", "Quoted_\"");
@ -374,7 +390,7 @@ public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildNoStreamProducer(context, config);
TestConsumer consumer = testConsumer(31);
@ -443,7 +459,7 @@ public void shouldGenerateSnapshotsForHstores() throws Exception {
selector
);
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildNoStreamProducer(context, config);
TestConsumer consumer = testConsumer(1, "public", "Quoted_\"");

View File

@ -22,6 +22,8 @@
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON;
import static io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs.DecoderPluginName.DECODERBUFS;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
@ -71,6 +73,10 @@ public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
@Before
public void before() throws Exception {
// ensure the slot is deleted for each test
try (PostgresConnection conn = TestHelper.create()) {
conn.dropReplicationSlot(ReplicationConnection.Builder.DEFAULT_SLOT_NAME);
}
TestHelper.dropAllSchemas();
TestHelper.executeDDL("init_postgis.ddl");
String statements =
@ -1287,6 +1293,56 @@ public void testEmptyChangesProducesHeartbeat() throws Exception {
assertThat(consumer.isEmpty()).isTrue();
}
@Test
@FixFor("DBZ-1082")
public void shouldHaveNoXminWhenNotEnabled() throws Exception {
// Verify that passing multiple stream parameters and multiple parameter values works.
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.XMIN_FETCH_INTERVAL, "0")
.build());
setupRecordsProducer(config);
consumer = testConsumer(1);
recordsProducer.start(consumer, blackHole);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;");
String statement = "INSERT INTO test_table (text) VALUES ('no_xmin');";
executeAndWait(statement);
// Verify the record that made it does not have an xmin
SourceRecord rec = assertRecordInserted("public.test_table", PK_FIELD, 2);
assertSourceInfo(rec, "postgres", "public", "test_table");
Struct source = ((Struct) rec.value()).getStruct("source");
assertThat(source.getInt64("xmin")).isNull();
assertThat(consumer.isEmpty()).isTrue();
}
@Test
@FixFor("DBZ-1082")
public void shouldHaveXminWhenEnabled() throws Exception {
// Verify that passing multiple stream parameters and multiple parameter values works.
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.XMIN_FETCH_INTERVAL, "10")
.build());
setupRecordsProducer(config);
consumer = testConsumer(1);
recordsProducer.start(consumer, blackHole);
TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;");
String statement = "INSERT INTO test_table (text) VALUES ('with_xmin');";
executeAndWait(statement);
// Verify the record that made it does not have an xmin
SourceRecord rec = assertRecordInserted("public.test_table", PK_FIELD, 2);
assertSourceInfo(rec, "postgres", "public", "test_table");
Struct source = ((Struct) rec.value()).getStruct("source");
assertThat(source.getInt64("xmin")).isGreaterThan(0L);
assertThat(consumer.isEmpty()).isTrue();
}
private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode mode, boolean tablesBeforeStart) throws Exception{
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, mode)

View File

@ -13,6 +13,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
@ -48,11 +50,12 @@ public class SnapshotWithOverridesProducerIT extends AbstractRecordsProducerTest
private RecordsSnapshotProducer snapshotProducer;
private PostgresTaskContext context;
private PostgresConnectorConfig config;
public void before(Configuration overrides) throws SQLException {
TestHelper.dropAllSchemas();
PostgresConnectorConfig config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build());
config = new PostgresConnectorConfig(TestHelper.defaultConfig().with(overrides).build());
TopicSelector<TableId> selector = PostgresTopicSelector.create(config);
context = new PostgresTaskContext(
config,
@ -74,7 +77,7 @@ public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws Except
.with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, "over.t1")
.with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t1", "SELECT * FROM over.t1 WHERE pk > 100")
.build());
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildStreamProducer(context, config);
final int expectedRecordsCount = 3 + 6;
@ -96,7 +99,7 @@ public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() thro
.with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t1", "SELECT * FROM over.t1 WHERE pk > 101")
.with(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + ".over.t2", "SELECT * FROM over.t2 WHERE pk > 100")
.build());
snapshotProducer = new RecordsSnapshotProducer(context, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), false);
snapshotProducer = buildStreamProducer(context, config);
final int expectedRecordsCount = 2 + 3;
@ -121,5 +124,9 @@ private Map<String, List<SourceRecord>> recordsByTopic(final int expectedRecords
return recordsByTopic;
}
private RecordsSnapshotProducer buildStreamProducer(PostgresTaskContext ctx, PostgresConnectorConfig config) {
Snapshotter sn = new InitialOnlySnapshotter();
sn.init(config, null, null);
return new RecordsSnapshotProducer(ctx, new SourceInfo(TestHelper.TEST_SERVER, TestHelper.TEST_DATABASE), sn);
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Marks the annotated element as incubating. The contract of incubating elements (e.g. packages, types, methods,
* constants etc.) is under active development and may be incompatibly altered - or removed - in subsequent releases.
* <p>
* Usage of incubating API/SPI members is encouraged (so the development team can get feedback on these new features)
* but you should be prepared for updating code which is using them as needed.
*
*/
@Documented
@Retention(RetentionPolicy.CLASS)
public @interface Incubating {
}

View File

@ -242,4 +242,8 @@ public static long toEpochMicros(Instant instant) {
public static Instant toInstant(long epochNanos) {
return Instant.ofEpochSecond(0, epochNanos);
}
public static Instant toInstantFromMicros(long epochMicros) {
return toInstant(TimeUnit.MICROSECONDS.toNanos(epochMicros));
}
}

View File

@ -28,6 +28,11 @@ public long currentTimeInMillis() {
public long currentTimeInNanos() {
return System.nanoTime();
}
@Override
public Instant currentTimeAsInstant() {
return Instant.now();
}
};
/**
@ -57,6 +62,15 @@ default long currentTimeInMicros() {
return TimeUnit.MICROSECONDS.convert(currentTimeInMillis(), TimeUnit.MILLISECONDS);
}
/**
* Get the current time as an instant
*
* @return the current time as an instant.
*/
default Instant currentTimeAsInstant() {
return Instant.ofEpochMilli(currentTimeInMillis());
}
/**
* Get the current time in milliseconds.
* @return the current time in milliseconds.

View File

@ -159,7 +159,7 @@ public final class EmbeddedEngine implements Runnable {
public static final Field OFFSET_COMMIT_POLICY = Field.create("offset.commit.policy")
.withDescription("The fully-qualified class name of the commit policy type. This class must implement the interface "
+ OffsetCommitPolicy.class.getName()
+ ". The default is a periodic commity policy based upon time intervals.")
+ ". The default is a periodic commit policy based upon time intervals.")
.withDefault(OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withValidation(Field::isClassName);