DBZ-7300 Snapshotter, SnapshotLock and SnapshotQuery are now services registered in the ServiceRegistry

This commit is contained in:
mfvitale 2024-01-22 11:22:59 +01:00 committed by Jiri Pechanec
parent 8e17724392
commit 3b92786a41
41 changed files with 1149 additions and 373 deletions

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.common;
import java.util.Map;
/**
* A contract that defines a configurable interface
*
* @author Mario Fiore Vitale
*
*/
public interface Configurable {
/**
* Connector properties are passed to let you configure your implementation.
*
* @param properties map of configurable properties
*/
void configure(Map<String, ?> properties);
}

View File

@ -5,55 +5,74 @@
*/
package io.debezium.spi.snapshot;
import io.debezium.DebeziumException;
import io.debezium.common.annotation.Incubating;
import io.debezium.spi.common.Configurable;
/**
* This interface is used to determine details about the snapshot process:
*
* Namely:
* - Should a snapshot occur at all
* - Should streaming occur
* - What queries should be used to snapshot
*
* {@link Snapshotter} is used to determine details about the snapshot process:
* <p>
* Namely: <br>
* - Should a snapshot occur at all <br>
* - Should streaming occur <br>
* - Should snapshot schema (if supported) <br>
* - Should snapshot data/schema on error
* <p>
* While many default snapshot modes are provided with debezium (see documentation for details)
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
* Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()}
* or true for both.
*
* @author Mario Fiore Vitale
*/
@Incubating
public interface Snapshotter {
public interface Snapshotter extends Configurable {
/**
* @return true if the snapshotter should take a snapshot
* @return the name of the snapshotter.
*
*
*/
String name();
/**
* Validate the snapshotter compatibility with the current connector configuration.
* Throws a {@link DebeziumException} in case it is not compatible.
*
* @param offsetContextExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param isSnapshotInProgress is {@code true} when the connector is started but there was already a snapshot in progress
*/
void validate(boolean offsetContextExists, boolean isSnapshotInProgress);
/**
* @return {@code true} if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
/**
* @return true if the snapshotter should take a snapshot
* @return {@code true} if the snapshotter should take a snapshot
*/
boolean shouldSnapshotSchema();
/**
* @return true if the snapshotter should stream after taking a snapshot
* @return {@code true} if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return true whether the schema can be recovered if database schema history is corrupted.
* @return {@code true} whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return true whether the snapshot should be re-executed when there is a gap in data stream.
* @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return true if streaming should resume from the start of the snapshot
* transaction, or false for when a connector resumes and takes a snapshot,
* @return {@code true} if streaming should resume from the start of the snapshot
* transaction, or {@code false} for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {

View File

@ -1,29 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
import java.util.List;
import java.util.Map;
/**
* This interface is used to provide custom snapshotters:
* Implementations must:
*
* provide a map of snapshotter in the {@link #create(Configuration config)} method.
*
* @author Mario Fiore Vitale
*/
public interface SnapshotterProvider {
/**
* Create a map of snapshotter where the key is its name used in 'snapshot.mode' configuration.
*
* @param Configuration the connector configuration
*
* @return a map of custom snapshotter
*/
Map<String, Snapshotter> create(Configuration config); // Can we move the Configuration interface from core module?
}

View File

@ -14,7 +14,6 @@
import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
@ -26,6 +25,7 @@
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.schema.DatabaseSchema;
import io.debezium.snapshot.SnapshotterService;
/**
* Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base
@ -35,7 +35,7 @@ public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoord
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class);
private final Snapshotter snapshotter;
private final SnapshotterService snapshotterService;
private final SlotState slotInfo;
public PostgresChangeEventSourceCoordinator(Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets,
@ -45,12 +45,12 @@ public PostgresChangeEventSourceCoordinator(Offsets<PostgresPartition, PostgresO
PostgresChangeEventSourceFactory changeEventSourceFactory,
ChangeEventSourceMetricsFactory<PostgresPartition> changeEventSourceMetricsFactory,
EventDispatcher<PostgresPartition, ?> eventDispatcher, DatabaseSchema<?> schema,
Snapshotter snapshotter, SlotState slotInfo,
SnapshotterService snapshotterService, SlotState slotInfo,
SignalProcessor<PostgresPartition, PostgresOffsetContext> signalProcessor,
NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) {
super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory,
changeEventSourceMetricsFactory, eventDispatcher, schema, signalProcessor, notificationService);
this.snapshotter = snapshotter;
this.snapshotterService = snapshotterService;
this.slotInfo = slotInfo;
}
@ -60,7 +60,7 @@ protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContex
PostgresPartition partition,
PostgresOffsetContext previousOffset)
throws InterruptedException {
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) {
if (previousOffset != null && !snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && slotInfo != null) {
try {
setSnapshotStartLsn((PostgresSnapshotChangeEventSource) snapshotSource,
previousOffset);

View File

@ -11,7 +11,6 @@
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.notification.NotificationService;
@ -22,6 +21,7 @@
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
@ -35,12 +35,12 @@ public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactor
private final Clock clock;
private final PostgresSchema schema;
private final PostgresTaskContext taskContext;
private final Snapshotter snapshotter;
private final SnapshotterService snapshotterService;
private final ReplicationConnection replicationConnection;
private final SlotCreationResult slotCreatedInfo;
private final SlotState startingSlotInfo;
public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter,
public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, SnapshotterService snapshotterService,
MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory,
ErrorHandler errorHandler, PostgresEventDispatcher<TableId> dispatcher, Clock clock, PostgresSchema schema,
PostgresTaskContext taskContext, ReplicationConnection replicationConnection, SlotCreationResult slotCreatedInfo,
@ -52,7 +52,7 @@ public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, S
this.clock = clock;
this.schema = schema;
this.taskContext = taskContext;
this.snapshotter = snapshotter;
this.snapshotterService = snapshotterService;
this.replicationConnection = replicationConnection;
this.slotCreatedInfo = slotCreatedInfo;
this.startingSlotInfo = startingSlotInfo;
@ -63,7 +63,7 @@ public SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> getSn
NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) {
return new PostgresSnapshotChangeEventSource(
configuration,
snapshotter,
snapshotterService,
connectionFactory,
schema,
dispatcher,
@ -78,7 +78,7 @@ public SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> getSn
public StreamingChangeEventSource<PostgresPartition, PostgresOffsetContext> getStreamingChangeEventSource() {
return new PostgresStreamingChangeEventSource(
configuration,
snapshotter,
snapshotterService,
connectionFactory.mainConnection(),
dispatcher,
errorHandler,

View File

@ -93,7 +93,6 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
// Prepare connection without initial statement execution
connection.connection(false);
testConnection(connection);
checkWalLevel(connection, postgresConfig);
checkLoginReplicationRoles(connection);
}
catch (SQLException e) {
@ -143,21 +142,6 @@ private static void checkLoginReplicationRoles(PostgresConnection connection) th
}
}
private static void checkWalLevel(PostgresConnection connection, PostgresConnectorConfig config) throws SQLException {
final String walLevel = connection.queryAndMap(
"SHOW wal_level",
connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level"));
if (!"logical".equals(walLevel)) {
if (config.getSnapshotter() != null && config.getSnapshotter().shouldStream()) {
// Logical WAL_LEVEL is only necessary for CDC snapshotting
throw new SQLException("Postgres server wal_level property must be 'logical' but is: '" + walLevel + "'");
}
else {
LOGGER.warn("WAL_LEVEL check failed but this is ignored as CDC was not requested");
}
}
}
private static void testConnection(PostgresConnection connection) throws SQLException {
connection.execute("SELECT version()");
LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(),

View File

@ -33,11 +33,6 @@
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder;
import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
@ -184,45 +179,33 @@ public enum SnapshotMode implements EnumeratedValue {
/**
* Always perform a snapshot when starting.
*/
ALWAYS("always", (c) -> new AlwaysSnapshotter()),
ALWAYS("always"),
/**
* Perform a snapshot only upon initial startup of a connector.
*/
INITIAL("initial", (c) -> new InitialSnapshotter()),
INITIAL("initial"),
/**
* Never perform a snapshot and only receive logical changes.
*/
NEVER("never", (c) -> new NeverSnapshotter()),
NEVER("never"),
/**
* Perform a snapshot and then stop before attempting to receive any logical changes.
*/
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),
INITIAL_ONLY("initial_only"),
/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
CUSTOM("custom", (c) -> {
return c.getInstance(SNAPSHOT_MODE_CLASS, Snapshotter.class);
});
@FunctionalInterface
public interface SnapshotterBuilder {
Snapshotter buildSnapshotter(Configuration config);
}
CUSTOM("custom");
private final String value;
private final SnapshotterBuilder builderFunc;
SnapshotMode(String value, SnapshotterBuilder buildSnapshotter) {
SnapshotMode(String value) {
this.value = value;
this.builderFunc = buildSnapshotter;
}
public Snapshotter getSnapshotter(Configuration config) {
return builderFunc.buildSnapshotter(config);
}
@Override
@ -482,6 +465,121 @@ public static SchemaRefreshMode parse(String value) {
}
}
public enum SnapshotLockingMode implements EnumeratedValue {
/**
* This mode will lock in ACCESS SHARE MODE to avoid concurrent schema changes during the snapshot, and
* this does not prevent writes to the table, but prevents changes to the table's schema.
*/
SHARED("shared"),
/**
* This mode will avoid using ANY table locks during the snapshot process.
* This mode should be used carefully only when no schema changes are to occur.
*/
NONE("none"),
CUSTOM("custom");
private final String value;
SnapshotLockingMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @return the matching option, or null if no match is found
*/
public static SnapshotLockingMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotLockingMode option : SnapshotLockingMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @param defaultValue the default value; may be {@code null}
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotLockingMode parse(String value, String defaultValue) {
SnapshotLockingMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
public enum SnapshotQueryMode implements EnumeratedValue {
/**
* This mode will do a select based on {@code column.include.list} and {@code column.exclude.list} configurations.
*/
SELECT_ALL("select_all"),
CUSTOM("custom");
private final String value;
SnapshotQueryMode(String value) {
this.value = value;
}
@Override
public String getValue() {
return value;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @return the matching option, or null if no match is found
*/
public static SnapshotQueryMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (SnapshotQueryMode option : SnapshotQueryMode.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}
/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be {@code null}
* @param defaultValue the default value; may be {@code null}
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static SnapshotQueryMode parse(String value, String defaultValue) {
SnapshotQueryMode mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
protected static final String DATABASE_CONFIG_PREFIX = "database.";
protected static final int DEFAULT_PORT = 5_432;
protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240;
@ -754,22 +852,60 @@ public static AutoCreateMode parse(String value, String defaultValue) {
+ "'exported': This option is deprecated; use 'initial' instead.; "
+ "'custom': The connector loads a custom class to specify how the connector performs snapshots. For more information, see Custom snapshotter SPI in the PostgreSQL connector documentation.");
public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class")
.withDisplayName("Snapshot Mode Custom Class")
public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode")
.withDisplayName("Snapshot locking mode")
.withEnum(SnapshotLockingMode.class, SnapshotLockingMode.NONE)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 12))
.withDescription("Controls how the connector holds locks on tables while performing the schema snapshot. The 'shared' "
+ "which means the connector will hold a table lock that prevents exclusive table access for just the initial portion of the snapshot "
+ "while the database schemas and other metadata are being read. The remaining work in a snapshot involves selecting all rows from "
+ "each table, and this is done using a flashback query that requires no locks. However, in some cases it may be desirable to avoid "
+ "locks entirely which can be done by specifying 'none'. This mode is only safe to use if no schema changes are happening while the "
+ "snapshot is taken.");
public static final Field SNAPSHOT_LOCKING_MODE_CUSTOM_NAME = Field.create("snapshot.locking.mode.custom.name")
.withDisplayName("Snapshot Locking Mode Custom Name")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 9))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 13))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.custom_class cannot be empty when snapshot.mode 'custom' is defined");
if (config.getString(SNAPSHOT_LOCKING_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.locking.mode.custom.name cannot be empty when snapshot.locking.mode 'custom' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as custom, this setting must be set to specify a fully qualified class name to load (via the default class loader). "
+ "This class must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot and how to build queries.");
"When 'snapshot.locking.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. "
+ "The implementations must implement the 'SnapshotterLocking' interface and is called to determine how to lock tables during schema snapshot.");
public static final Field SNAPSHOT_QUERY_MODE = Field.create("snapshot.query.mode")
.withDisplayName("Snapshot query mode")
.withEnum(SnapshotQueryMode.class, SnapshotQueryMode.SELECT_ALL)
.withWidth(Width.SHORT)
.withImportance(Importance.LOW)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 14))
.withDescription("Controls query used during the snapshot");
public static final Field SNAPSHOT_QUERY_MODE_CUSTOM_NAME = Field.create("snapshot.query.mode.custom.name")
.withDisplayName("Snapshot Query Mode Custom Name")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 15))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_QUERY_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.query.mode.custom.name cannot be empty when snapshot.query.mode 'custom' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.query.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. "
+ "The implementations must implement the 'SnapshotterQuery' interface and is called to determine how to build queries during snapshot.");
/**
* A comma-separated list of regular expressions that match the prefix of logical decoding messages to be excluded
@ -909,11 +1045,16 @@ public static AutoCreateMode parse(String value, String defaultValue) {
private final LogicalDecodingMessageFilter logicalDecodingMessageFilter;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
private final SnapshotMode snapshotMode;
private final SchemaRefreshMode schemaRefreshMode;
private final boolean flushLsnOnSource;
private final ReplicaIdentityMapper replicaIdentityMapper;
private final SnapshotMode snapshotMode;
private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotQueryMode snapshotQueryMode;
private final String snapshotQueryModeCustomName;
private final String snapshotLockingModeCustomName;
public PostgresConnectorConfig(Configuration config) {
super(
config,
@ -928,11 +1069,15 @@ public PostgresConnectorConfig(Configuration config) {
String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE);
this.hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr);
this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE));
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE));
this.schemaRefreshMode = SchemaRefreshMode.parse(config.getString(SCHEMA_REFRESH_MODE));
this.flushLsnOnSource = config.getBoolean(SHOULD_FLUSH_LSN_IN_SOURCE_DB);
final var replicaIdentityMapping = config.getString(REPLICA_IDENTITY_AUTOSET_VALUES);
this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null;
this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString());
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.snapshotQueryMode = SnapshotQueryMode.parse(config.getString(SNAPSHOT_QUERY_MODE), SNAPSHOT_QUERY_MODE.defaultValueAsString());
this.snapshotQueryModeCustomName = config.getString(SNAPSHOT_QUERY_MODE_CUSTOM_NAME, "");
this.snapshotLockingModeCustomName = config.getString(SNAPSHOT_LOCKING_MODE_CUSTOM_NAME, "");
}
protected String hostname() {
@ -1011,10 +1156,6 @@ public Map<String, ConfigValue> validate() {
return getConfig().validate(ALL_FIELDS);
}
protected Snapshotter getSnapshotter() {
return this.snapshotMode.getSnapshotter(getConfig());
}
protected boolean skipRefreshSchemaOnMissingToastableData() {
return SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST == this.schemaRefreshMode;
}
@ -1040,6 +1181,26 @@ public Optional<ReplicaIdentityMapper> replicaIdentityMapper() {
return Optional.ofNullable(this.replicaIdentityMapper);
}
public SnapshotMode snapshotMode() {
return this.snapshotMode;
}
public SnapshotLockingMode snapshotLockingMode() {
return this.snapshotLockingMode;
}
public SnapshotQueryMode snapshotQueryMode() {
return this.snapshotQueryMode;
}
public String snapshotQueryModeCustomName() {
return this.snapshotQueryModeCustomName;
}
public String snapshotLockingModeCustomName() {
return this.snapshotLockingModeCustomName;
}
protected int moneyFractionDigits() {
return getConfig().getInteger(MONEY_FRACTION_DIGITS);
}
@ -1085,7 +1246,6 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SOURCE_INFO_STRUCT_MAKER)
.connector(
SNAPSHOT_MODE,
SNAPSHOT_MODE_CLASS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,

View File

@ -30,9 +30,11 @@
import io.debezium.connector.postgresql.connection.PostgresConnection.PostgresValueConverterBuilder;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.snapshot.PostgresSnapshotterServiceProvider;
import io.debezium.connector.postgresql.snapshot.SnapshotLockProvider;
import io.debezium.connector.postgresql.snapshot.SnapshotQueryProvider;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.document.DocumentReader;
import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
@ -46,6 +48,9 @@
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
@ -72,15 +77,11 @@ public class PostgresConnectorTask extends BaseSourceTask<PostgresPartition, Pos
@Override
public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> start(Configuration config) {
final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config);
final TopicNamingStrategy<TableId> topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
final Snapshotter snapshotter = connectorConfig.getSnapshotter();
final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster();
if (snapshotter == null) {
throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings");
}
final Charset databaseCharset;
try (PostgresConnection tempConnection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) {
databaseCharset = tempConnection.getDatabaseCharset();
@ -121,10 +122,23 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema);
connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection);
connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverter);
connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsets);
// Service providers
registerServiceProviders(connectorConfig.getServiceRegistry());
final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
final Snapshotter snapshotter = snapshotterService.getSnapshotter();
try {
checkWalLevel(beanRegistryJdbcConnection, snapshotterService);
}
catch (SQLException e) {
LOGGER.error("Failed testing connection for {} with user '{}'", beanRegistryJdbcConnection.connectionString(),
beanRegistryJdbcConnection.username(), e);
}
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
// Print out the server information
@ -142,11 +156,11 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
if (previousOffset == null) {
LOGGER.info("No previous offset found");
// if we have no initial offset, indicate that to Snapshotter by passing null
snapshotter.init(connectorConfig, null, slotInfo);
snapshotter.validate(false, false);
}
else {
LOGGER.info("Found previous offset {}", previousOffset);
snapshotter.init(connectorConfig, previousOffset.asOffsetState(), slotInfo);
snapshotter.validate(true, previousOffset.isSnapshotRunning());
}
SlotCreationResult slotCreatedInfo = null;
@ -237,7 +251,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
connectorConfig,
new PostgresChangeEventSourceFactory(
connectorConfig,
snapshotter,
snapshotterService,
connectionFactory,
errorHandler,
dispatcher,
@ -250,7 +264,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
schema,
snapshotter,
snapshotterService,
slotInfo,
signalProcessor,
notificationService);
@ -294,6 +308,15 @@ public ReplicationConnection createReplicationConnection(PostgresTaskContext tas
return replicationConnection;
}
@Override
protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
super.registerServiceProviders(serviceRegistry);
serviceRegistry.registerServiceProvider(new SnapshotLockProvider());
serviceRegistry.registerServiceProvider(new SnapshotQueryProvider());
serviceRegistry.registerServiceProvider(new PostgresSnapshotterServiceProvider());
}
@Override
public List<SourceRecord> doPoll() throws InterruptedException {
final List<DataChangeEvent> records = queue.poll();
@ -351,4 +374,20 @@ protected Iterable<Field> getAllConfigurationFields() {
public PostgresTaskContext getTaskContext() {
return taskContext;
}
private static void checkWalLevel(PostgresConnection connection, SnapshotterService snapshotterService) throws SQLException {
final String walLevel = connection.queryAndMap(
"SHOW wal_level",
connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level"));
if (!"logical".equals(walLevel)) {
// TODO here I don't have the snapshotter, it is not yet injected
if (snapshotterService.getSnapshotter() != null && snapshotterService.getSnapshotter().shouldStream()) {
// Logical WAL_LEVEL is only necessary for CDC snapshotting
throw new SQLException("Postgres server wal_level property must be 'logical' but is: '" + walLevel + "'");
}
else {
LOGGER.warn("WAL_LEVEL check failed but this is ignored as CDC was not requested");
}
}
}
}

View File

@ -19,10 +19,9 @@
import io.debezium.connector.postgresql.PostgresOffsetContext.Loader;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
@ -33,6 +32,8 @@
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;
public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> {
@ -42,12 +43,12 @@ public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeE
private final PostgresConnectorConfig connectorConfig;
private final PostgresConnection jdbcConnection;
private final PostgresSchema schema;
private final Snapshotter snapshotter;
private final SnapshotterService snapshotterService;
private final Snapshotter blockingSnapshotter;
private final SlotCreationResult slotCreatedInfo;
private final SlotState startingSlotInfo;
public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, SnapshotterService snapshotterService,
MainConnectionProvidingConnectionFactory<PostgresConnection> connectionFactory, PostgresSchema schema,
EventDispatcher<PostgresPartition, TableId> dispatcher, Clock clock,
SnapshotProgressListener<PostgresPartition> snapshotProgressListener, SlotCreationResult slotCreatedInfo,
@ -56,7 +57,7 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig
this.connectorConfig = connectorConfig;
this.jdbcConnection = connectionFactory.mainConnection();
this.schema = schema;
this.snapshotter = snapshotter;
this.snapshotterService = snapshotterService;
this.slotCreatedInfo = slotCreatedInfo;
this.startingSlotInfo = startingSlotInfo;
this.blockingSnapshotter = new AlwaysSnapshotter();
@ -71,7 +72,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
boolean snapshotData = snapshotter.shouldSnapshot();
boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshot();
if (snapshotData) {
LOGGER.info("According to the connector configuration data will be snapshotted");
}
@ -97,7 +98,7 @@ protected void connectionCreated(RelationalSnapshotContext<PostgresPartition, Po
// streaming phase so that the snapshot is performed from a consistent view of the data. Since the isolation
// level on the transaction used in catch up streaming has already set the isolation level and executed
// statements, the transaction does not need to get set the level again here.
if (snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotInfo == null) {
if (snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && startingSlotInfo == null) {
setSnapshotTransactionIsolationLevel(snapshotContext.onDemand);
}
schema.refresh(jdbcConnection, false);
@ -113,8 +114,10 @@ protected Set<TableId> getAllTableIds(RelationalSnapshotContext<PostgresPartitio
protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext)
throws SQLException {
final Duration lockTimeout = connectorConfig.snapshotLockTimeout();
final Optional<String> lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables);
final Set<String> capturedTablesNames = snapshotContext.capturedTables.stream().map(TableId::toDoubleQuotedString).collect(Collectors.toSet());
final Optional<String> lockStatement = snapshotterService.getSnapshotLock().tableLockingStatement(lockTimeout, capturedTablesNames);
if (lockStatement.isPresent()) {
LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", lockTimeout.getSeconds());
@ -134,7 +137,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext<PostgresPartiti
throws Exception {
PostgresOffsetContext offset = ctx.offset;
if (offset == null) {
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot()) {
if (previousOffset != null && !snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot()) {
// The connect framework, not the connector, manages triggering committing offset state so the
// replication stream may not have flushed the latest offset state during catch up streaming.
// The previousOffset variable is shared between the catch up streaming and snapshot phases and
@ -173,7 +176,7 @@ private Lsn getTransactionStartLsn() throws SQLException {
// they'll be lost.
return slotCreatedInfo.startLsn();
}
else if (!snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotInfo != null) {
else if (!snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && startingSlotInfo != null) {
// Allow streaming to resume from where streaming stopped last rather than where the current snapshot starts.
SlotState currentSlotState = jdbcConnection.getReplicationSlotState(connectorConfig.slotName(),
connectorConfig.plugin().getPostgresPluginName());
@ -218,19 +221,18 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
@Override
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext,
Table table)
throws SQLException {
Table table) {
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}
@Override
protected void completed(SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
snapshotter.snapshotCompleted();
snapshotterService.getSnapshotter().snapshotCompleted();
}
@Override
protected void aborted(SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
snapshotter.snapshotAborted();
snapshotterService.getSnapshotter().snapshotAborted();
}
/**
@ -242,20 +244,34 @@ protected void aborted(SnapshotContext<PostgresPartition, PostgresOffsetContext>
@Override
protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext,
TableId tableId, List<String> columns) {
if (snapshotContext.onDemand) {
return blockingSnapshotter.buildSnapshotQuery(tableId, columns);
}
return snapshotter.buildSnapshotQuery(tableId, columns);
return snapshotterService.getSnapshotQuery().snapshotQuery(tableId.toDoubleQuotedString(), columns);
}
protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws SQLException {
LOGGER.info("Setting isolation level");
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
String transactionStatement = snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
LOGGER.info("Opening transaction with statement {}", transactionStatement);
jdbcConnection.executeWithoutCommitting(transactionStatement);
}
private String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
if (newSlotInfo != null && !isOnDemand) {
/*
* For an on demand blocking snapshot we don't need to reuse
* the same snapshot from the existing exported transaction as for the initial snapshot.
*/
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
// TODO should this customizable?
// we're using the same isolation level that pg_backup uses
return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
}
/**
* Mutable context which is populated in the course of snapshotting.
*/

View File

@ -25,11 +25,11 @@
import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.WalPositionLocator;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import io.debezium.util.ElapsedTimeStrategy;
@ -64,7 +64,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
private final PostgresTaskContext taskContext;
private final ReplicationConnection replicationConnection;
private final AtomicReference<ReplicationStream> replicationStream = new AtomicReference<>();
private final Snapshotter snapshotter;
private final SnapshotterService snapshotterService;
private final DelayStrategy pauseNoMessage;
private final ElapsedTimeStrategy connectionProbeTimer;
@ -82,7 +82,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
private Lsn lastCompletelyProcessedLsn;
private PostgresOffsetContext effectiveOffset;
public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, SnapshotterService snapshotterService,
PostgresConnection connection, PostgresEventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
PostgresSchema schema, PostgresTaskContext taskContext, ReplicationConnection replicationConnection) {
this.connectorConfig = connectorConfig;
@ -93,7 +93,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.schema = schema;
pauseNoMessage = DelayStrategy.constant(taskContext.getConfig().getPollInterval());
this.taskContext = taskContext;
this.snapshotter = snapshotter;
this.snapshotterService = snapshotterService;
this.replicationConnection = replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());
@ -119,8 +119,8 @@ private void initSchema() {
@Override
public void execute(ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext offsetContext)
throws InterruptedException {
if (!snapshotter.shouldStream()) {
LOGGER.info("Streaming is not enabled in correct configuration");
if (!snapshotterService.getSnapshotter().shouldStream()) {
LOGGER.info("Streaming is not enabled in configuration");
return;
}

View File

@ -1,25 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AlwaysSnapshotter extends QueryingSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
LOGGER.info("Taking a new snapshot as per configuration");
return true;
}
}

View File

@ -1,47 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
public class InitialSnapshotter extends QueryingSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class);
private OffsetState sourceInfo;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
super.init(config, sourceInfo, slotState);
this.sourceInfo = sourceInfo;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info(
"Previous snapshot has completed successfully, streaming logical changes from last known position");
return false;
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.SnapshotterServiceProvider;
public class PostgresSnapshotterServiceProvider extends SnapshotterServiceProvider {
@Override
public String snapshotMode(BeanRegistry beanRegistry) {
PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class);
return postgresConnectorConfig.snapshotMode().getValue();
}
}

View File

@ -1,52 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.relational.TableId;
public abstract class QueryingSnapshotter implements Snapshotter {
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapshotSelectColumns) {
String query = snapshotSelectColumns.stream()
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId.toDoubleQuotedString()));
return Optional.of(query);
}
@Override
public Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) {
return Optional.empty();
}
@Override
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
if (newSlotInfo != null && !isOnDemand) {
/*
* For an on demand blocking snapshot we don't need to reuse
* the same snapshot from the existing exported transaction as for the initial snapshot.
*/
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo, isOnDemand);
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotLockingMode.CUSTOM;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotLockingMode;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotLock;
/**
* An implementation of the {@link ServiceProvider} contract for the {@link SnapshotLock}.
*
* @author Mario Fiore Vitale
*/
public class SnapshotLockProvider implements ServiceProvider<SnapshotLock> {
@Override
public SnapshotLock createService(Configuration configuration, ServiceRegistry serviceRegistry) {
BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class);
PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class);
final SnapshotLockingMode configuredSnapshotLockingMode = postgresConnectorConfig.snapshotLockingMode();
final String snapshotLockingModeCustomName = postgresConnectorConfig.snapshotLockingModeCustomName();
String snapshotLockingMode;
if (CUSTOM.equals(configuredSnapshotLockingMode) && !snapshotLockingModeCustomName.isEmpty()) {
snapshotLockingMode = snapshotLockingModeCustomName;
}
else {
snapshotLockingMode = configuredSnapshotLockingMode.getValue();
}
Optional<SnapshotLock> snapshotLock = StreamSupport.stream(ServiceLoader.load(SnapshotLock.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotLockingMode))
.findAny();
return snapshotLock.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
}
return s;
})
.orElseThrow(
() -> new DebeziumException(String.format("Unable to find %s snapshot locking mode. Please check your configuration.", snapshotLockingMode)));
}
@Override
public Class<SnapshotLock> getServiceClass() {
return SnapshotLock.class;
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode.CUSTOM;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotQuery;
/**
* An implementation of the {@link ServiceProvider} contract for the {@link SnapshotQuery}.
*
* @author Mario Fiore Vitale
*/
public class SnapshotQueryProvider implements ServiceProvider<SnapshotQuery> {
@Override
public SnapshotQuery createService(Configuration configuration, ServiceRegistry serviceRegistry) {
BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class);
PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class);
final SnapshotQueryMode configuredSnapshotQueryMode = postgresConnectorConfig.snapshotQueryMode();
final String snapshotQueryModeCustomName = postgresConnectorConfig.snapshotQueryModeCustomName();
String snapshotQueryMode;
if (CUSTOM.equals(configuredSnapshotQueryMode) && !snapshotQueryModeCustomName.isEmpty()) {
snapshotQueryMode = snapshotQueryModeCustomName;
}
else {
snapshotQueryMode = configuredSnapshotQueryMode.getValue();
}
Optional<SnapshotQuery> snapshotQuery = StreamSupport.stream(ServiceLoader.load(SnapshotQuery.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotQueryMode))
.findAny();
return snapshotQuery.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
}
return s;
})
.orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshot query mode. Please check your configuration.", snapshotQueryMode)));
}
@Override
public Class<SnapshotQuery> getServiceClass() {
return SnapshotQuery.class;
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
/**
* This class is a small wrapper around the snapshotter that takes care of initialization
* and also lets us access the slotState (which we don't track currently)
*/
public class SnapshotterWrapper {
private final Snapshotter snapshotter;
private final SlotState slotState;
public SnapshotterWrapper(Snapshotter snapshotter, PostgresConnectorConfig config, OffsetState offsetState, SlotState slotState) {
this.snapshotter = snapshotter;
this.slotState = slotState;
this.snapshotter.init(config, offsetState, slotState);
}
public Snapshotter getSnapshotter() {
return this.snapshotter;
}
public boolean doesSlotExist() {
return this.slotState != null;
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot.lock;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
public class NoSnapshotLock implements SnapshotLock {
@Override
public String name() {
return PostgresConnectorConfig.SnapshotLockingMode.NONE.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> tableLockingStatement(Duration lockTimeout, Set<String> tableIds) {
return Optional.empty();
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot.lock;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.spi.SnapshotLock;
public class SharedSnapshotLock implements SnapshotLock {
@Override
public String name() {
return PostgresConnectorConfig.SnapshotLockingMode.SHARED.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> tableLockingStatement(Duration lockTimeout, Set<String> tableIds) {
String lineSeparator = System.lineSeparator();
StringBuilder statements = new StringBuilder();
statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator);
// we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot
// this does not prevent writes to the table, but prevents changes to the table's schema....
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
tableIds.forEach(tableId -> statements.append("LOCK TABLE ")
.append(tableId)
.append(" IN ACCESS SHARE MODE;")
.append(lineSeparator));
return Optional.of(statements.toString());
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot.mode;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class AlwaysSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
LOGGER.info("Taking a new snapshot as per configuration");
return true;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
}

View File

@ -3,24 +3,20 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
package io.debezium.connector.postgresql.snapshot.mode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
public class InitialOnlySnapshotter extends QueryingSnapshotter {
public class InitialOnlySnapshotter extends InitialSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialOnlySnapshotter.class);
private OffsetState sourceInfo;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
super.init(config, sourceInfo, slotState);
this.sourceInfo = sourceInfo;
public String name() {
return PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue();
}
@Override
@ -30,11 +26,12 @@ public boolean shouldStream() {
@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
if (!offsetContextExists) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
else if (isSnapshotInProgress) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}

View File

@ -0,0 +1,76 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot.mode;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class InitialSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class);
protected boolean offsetContextExists;
protected boolean isSnapshotInProgress;
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.INITIAL.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
this.offsetContextExists = offsetContextExists;
this.isSnapshotInProgress = isSnapshotInProgress;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
if (!offsetContextExists) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (isSnapshotInProgress) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info(
"Previous snapshot has completed successfully, streaming logical changes from last known position");
return false;
}
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
}

View File

@ -3,28 +3,35 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot;
package io.debezium.connector.postgresql.snapshot.mode;
import java.util.List;
import java.util.Optional;
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.relational.TableId;
import io.debezium.spi.snapshot.Snapshotter;
public class NeverSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(NeverSnapshotter.class);
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
if (sourceInfo != null && sourceInfo.snapshotInEffect()) {
public String name() {
return PostgresConnectorConfig.SnapshotMode.NEVER.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
if (offsetContextExists && isSnapshotInProgress) {
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
LOGGER.error(msg);
@ -46,7 +53,17 @@ public boolean shouldSnapshot() {
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapshotSelectColumns) {
throw new UnsupportedOperationException("'never' snapshot mode cannot build queries");
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.snapshot.query;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.snapshot.spi.SnapshotQuery;
public class SelectAllSnapshotQuery implements SnapshotQuery {
@Override
public String name() {
return PostgresConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue();
}
@Override
public void configure(Map<String, ?> properties) {
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
return Optional.of(snapshotSelectColumns.stream()
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId)));
}
}

View File

@ -0,0 +1,2 @@
io.debezium.connector.postgresql.snapshot.lock.NoSnapshotLock
io.debezium.connector.postgresql.snapshot.lock.SharedSnapshotLock

View File

@ -0,0 +1 @@
io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery

View File

@ -0,0 +1,4 @@
io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter
io.debezium.connector.postgresql.snapshot.mode.InitialOnlySnapshotter
io.debezium.connector.postgresql.snapshot.mode.InitialSnapshotter
io.debezium.connector.postgresql.snapshot.mode.NeverSnapshotter

View File

@ -6,10 +6,15 @@
package io.debezium.connector.postgresql;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter;
public class CustomLifecycleHookTestSnapshot extends AlwaysSnapshotter {
@Override
public String name() {
return CustomLifecycleHookTestSnapshot.class.getName();
}
private static final String INSERT_SNAPSHOT_COMPLETE_STATE = "INSERT INTO s1.lifecycle_state (hook, state) " +
"VALUES ('snapshotComplete', 'complete');";

View File

@ -9,15 +9,57 @@
import java.util.List;
import java.util.Optional;
import io.debezium.relational.TableId;
import io.debezium.spi.snapshot.Snapshotter;
public class CustomPartialTableTestSnapshot extends CustomStartFromStreamingTestSnapshot implements Snapshotter {
public class CustomPartialTableTestSnapshot extends CustomStartFromStreamingTestSnapshot {
@Override
public Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapshotSelectColumns) {
if (tableId.schema().equals("s1") && tableId.table().equals("a")) {
return super.buildSnapshotQuery(tableId, snapshotSelectColumns);
public String name() {
return CustomPartialTableTestSnapshot.class.getName();
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
if (tableId.contains("s1") && tableId.contains("a")) {
return super.snapshotQuery(tableId, snapshotSelectColumns);
}
return Optional.empty();
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
return true;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
@Override
public boolean shouldStreamEventsStartingFromSnapshot() {
return false;
}
}

View File

@ -6,9 +6,46 @@
package io.debezium.connector.postgresql;
import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter;
import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery;
import io.debezium.spi.snapshot.Snapshotter;
public class CustomStartFromStreamingTestSnapshot extends SelectAllSnapshotQuery implements Snapshotter {
@Override
public String name() {
return CustomStartFromStreamingTestSnapshot.class.getName();
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
return true;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
public class CustomStartFromStreamingTestSnapshot extends AlwaysSnapshotter {
@Override
public boolean shouldStreamEventsStartingFromSnapshot() {
return false;

View File

@ -9,11 +9,12 @@
import java.util.Optional;
import java.util.stream.Collectors;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotCreationResult;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.relational.TableId;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.spi.snapshot.Snapshotter;
/**
* This is a small class used in PostgresConnectorIT to test a custom snapshot
@ -21,13 +22,25 @@
* It is tightly coupled to the test there, but needs to be placed here in order
* to allow for class loading to work
*/
public class CustomTestSnapshot implements Snapshotter {
public class CustomTestSnapshot extends SelectAllSnapshotQuery implements Snapshotter, BeanRegistryAware {
private boolean hasState;
@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
hasState = (sourceInfo != null);
public String name() {
return CustomTestSnapshot.class.getName();
}
@Override
public void injectBeanRegistry(BeanRegistry beanRegistry) {
Offsets<PostgresPartition, PostgresOffsetContext> postgresoffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
hasState = postgresoffsets.getTheOnlyOffset() != null;
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
hasState = offsetContextExists;
}
@Override
@ -41,26 +54,31 @@ public boolean shouldStream() {
}
@Override
public Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapshotSelectColumns) {
if (!hasState && tableId.schema().equals("s2")) {
public boolean shouldSnapshotSchema() {
return false;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns) {
if (!hasState && tableId.contains("s2")) {
return Optional.empty();
}
else {
String query = snapshotSelectColumns.stream()
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId.toDoubleQuotedString()));
.collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId));
return Optional.of(query);
}
}
@Override
public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
if (newSlotInfo != null) {
String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName());
return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet;
}
else {
return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
}
}
}

View File

@ -71,6 +71,7 @@
import io.debezium.config.Field;
import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.PostgresReplicationConnection;
@ -80,7 +81,7 @@
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.snapshot.mode.InitialOnlySnapshotter;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
@ -1653,7 +1654,9 @@ public void shouldAllowForCustomSnapshot() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
@ -1684,7 +1687,9 @@ record = s2recs.get(0);
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE)
.build();
start(PostgresConnector.class, config);
@ -1972,7 +1977,7 @@ public void shouldResumeStreamingFromSlotPositionForCustomSnapshot() throws Exce
// Perform an regular snapshot
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomStartFromStreamingTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
@ -1995,7 +2000,7 @@ public void shouldResumeStreamingFromSlotPositionForCustomSnapshot() throws Exce
// Perform catch up streaming and resnapshot everything
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomStartFromStreamingTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
@ -2055,7 +2060,9 @@ public void customSnapshotterSkipsTablesOnRestart() throws Exception {
// Perform a custom partial snapshot
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
@ -2120,7 +2127,9 @@ public void customSnapshotterSkipsTablesOnRestartWithConcurrentTx() throws Excep
// Perform a custom partial snapshot
config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM)
.with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build();
start(PostgresConnector.class, config);
@ -2171,7 +2180,7 @@ public void testCustomSnapshotterSnapshotCompleteLifecycleHook() throws Exceptio
"CREATE TABLE s1.lifecycle_state (hook text, state text, PRIMARY KEY(hook));");
Configuration config = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomLifecycleHookTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomLifecycleHookTestSnapshot.class.getName())
.build();
start(PostgresConnector.class, config);
assertConnectorIsRunning();
@ -3379,7 +3388,7 @@ public void shouldInvokeSnapshotterAbortedMethod() throws Exception {
.with(PostgresConnectorConfig.POLL_INTERVAL_MS, 60 * 1000)
.with(PostgresConnectorConfig.SNAPSHOT_FETCH_SIZE, 1)
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomLifecycleHookTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomLifecycleHookTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
DebeziumEngine.CompletionCallback completionCallback = (success, message, error) -> {
@ -3436,7 +3445,7 @@ public void shouldThrowRightExceptionWhenNoCustomSnapshotClassProvided() {
.until(() -> finished.get());
assertThat(status.get()).isFalse();
assertNull(error.get());
assertThat(message.get()).contains("snapshot.custom_class cannot be empty when snapshot.mode 'custom' is defined");
assertThat(message.get()).contains("snapshot.mode.custom.name cannot be empty when snapshot.mode 'custom' is defined");
}
@FixFor("DBZ-5917")

View File

@ -1264,7 +1264,7 @@ private void buildNoStreamProducer(Configuration.Builder config) {
alterConfig(config);
start(PostgresConnector.class, config
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY)
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build());
assertConnectorIsRunning();
@ -1274,7 +1274,7 @@ private void buildWithStreamProducer(Configuration.Builder config) {
alterConfig(config);
start(PostgresConnector.class, config
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS)
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName())
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE)
.build());
assertConnectorIsRunning();

View File

@ -0,0 +1,3 @@
io.debezium.connector.postgresql.CustomTestSnapshot
io.debezium.connector.postgresql.CustomPartialTableTestSnapshot
io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot

View File

@ -0,0 +1,4 @@
io.debezium.connector.postgresql.CustomTestSnapshot
io.debezium.connector.postgresql.CustomLifecycleHookTestSnapshot
io.debezium.connector.postgresql.CustomPartialTableTestSnapshot
io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot

View File

@ -7,6 +7,7 @@
import io.debezium.config.Configuration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.ValueConverterProvider;
/**
@ -42,4 +43,9 @@ public interface StandardBeanNames {
// all connector-specific connection details across all connectors (including MongoDB),
// and then replace this name key as CONNECTION_FACTORY regardless of the connector.
String JDBC_CONNECTION = JdbcConnection.class.getName();
/**
* The connector-specific offsets.
*/
String OFFSETS = Offsets.class.getName();
}

View File

@ -854,6 +854,23 @@ public static EventConvertingFailureHandlingMode parse(String value) {
+ "'warn' (the default) the value of column of event that conversion failed will be null and be logged with warn level; "
+ "'skip' the value of column of event that conversion failed will be null and be logged with debug level.");
public static final Field SNAPSHOT_MODE_CUSTOM_NAME = Field.create("snapshot.mode.custom.name")
.withDisplayName("Snapshot Mode Custom Name")
.withType(Type.STRING)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 11))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation((config, field, output) -> {
if ("custom".equalsIgnoreCase(config.getString("snapshot.mode")) && config.getString(field, "").isEmpty()) {
output.accept(field, "", "snapshot.mode.custom.name cannot be empty when snapshot.mode 'custom' is defined");
return 1;
}
return 0;
})
.withDescription(
"When 'snapshot.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. "
+ "The implementations must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot.");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
EVENT_PROCESSING_FAILURE_HANDLING_MODE,
@ -867,6 +884,7 @@ public static EventConvertingFailureHandlingMode parse(String value) {
SNAPSHOT_MODE_TABLES,
SNAPSHOT_FETCH_SIZE,
SNAPSHOT_MAX_THREADS,
SNAPSHOT_MODE_CUSTOM_NAME,
RETRIABLE_RESTART_WAIT,
QUERY_FETCH_SIZE,
MAX_RETRIES_ON_ERROR,
@ -901,6 +919,8 @@ public static EventConvertingFailureHandlingMode parse(String value) {
private final int incrementalSnapshotChunkSize;
private final boolean incrementalSnapshotAllowSchemaChanges;
private final int snapshotMaxThreads;
private final String snapshotModeCustomName;
private final Integer queryFetchSize;
private final SourceInfoStructMaker<? extends AbstractSourceInfo> sourceInfoStructMaker;
private final boolean shouldProvideTransactionMetadata;
@ -944,6 +964,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT));
this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize);
this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS);
this.snapshotModeCustomName = config.getString(SNAPSHOT_MODE_CUSTOM_NAME);
this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE);
this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE);
this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES);
@ -1065,6 +1086,10 @@ public int getSnapshotMaxThreads() {
return snapshotMaxThreads;
}
public String getSnapshotModeCustomName() {
return snapshotModeCustomName;
}
public int getQueryFetchSize() {
return queryFetchSize;
}

View File

@ -0,0 +1,48 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot;
import io.debezium.annotation.Immutable;
import io.debezium.annotation.ThreadSafe;
import io.debezium.service.Service;
import io.debezium.snapshot.spi.SnapshotLock;
import io.debezium.snapshot.spi.SnapshotQuery;
import io.debezium.spi.snapshot.Snapshotter;
/**
* Registry of all available snapshotters.
*
* @author Mario Fiore Vitale
*/
@ThreadSafe
public class SnapshotterService implements Service {
@Immutable
private final Snapshotter snapshotter;
@Immutable
private final SnapshotQuery snapshotQuery;
@Immutable
private final SnapshotLock snapshotLock;
public SnapshotterService(Snapshotter snapshotter, SnapshotQuery snapshotQuery, SnapshotLock snapshotLock) {
this.snapshotter = snapshotter;
this.snapshotQuery = snapshotQuery;
this.snapshotLock = snapshotLock;
}
public SnapshotQuery getSnapshotQuery() {
return snapshotQuery;
}
public SnapshotLock getSnapshotLock() {
return snapshotLock;
}
public Snapshotter getSnapshotter() {
return this.snapshotter;
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.snapshot;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.stream.StreamSupport;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.bean.spi.BeanRegistry;
import io.debezium.bean.spi.BeanRegistryAware;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.spi.SnapshotLock;
import io.debezium.snapshot.spi.SnapshotQuery;
import io.debezium.spi.snapshot.Snapshotter;
/**
* An implementation of the {@link ServiceProvider} contract for the {@link SnapshotterService}.
*
* @author Mario Fiore Vitale
*/
public abstract class SnapshotterServiceProvider implements ServiceProvider<SnapshotterService> {
@Override
public SnapshotterService createService(Configuration configuration, ServiceRegistry serviceRegistry) {
final BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class);
final CommonConnectorConfig commonConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final String configuredSnapshotMode = snapshotMode(beanRegistry);
final String snapshotModeCustomName = commonConnectorConfig.getSnapshotModeCustomName();
String snapshotMode;
if ("custom".equals(configuredSnapshotMode) && !snapshotModeCustomName.isEmpty()) {
snapshotMode = snapshotModeCustomName;
}
else {
snapshotMode = configuredSnapshotMode;
}
Optional<Snapshotter> snapshotter = StreamSupport.stream(ServiceLoader.load(Snapshotter.class).spliterator(), false)
.filter(s -> s.name().equals(snapshotMode))
.findAny();
final SnapshotQuery snapshotQueryService = serviceRegistry.tryGetService(SnapshotQuery.class);
final SnapshotLock snapshotLockService = serviceRegistry.tryGetService(SnapshotLock.class);
return snapshotter.map(s -> {
s.configure(configuration.asMap());
if (s instanceof BeanRegistryAware) {
((BeanRegistryAware) s).injectBeanRegistry(beanRegistry);
}
return new SnapshotterService(s, snapshotQueryService, snapshotLockService);
})
.orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshotter. Please check your configuration.", snapshotMode)));
}
@Override
public Class<SnapshotterService> getServiceClass() {
return SnapshotterService.class;
}
// TODO this could be delete after DBZ-7308 if all modes will be effectively available to all connectors and
// SnapshotMode enum moved into CommonConnectorConfig
public abstract String snapshotMode(BeanRegistry beanRegistry);
}

View File

@ -3,17 +3,28 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
package io.debezium.snapshot.spi;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import io.debezium.service.Service;
import io.debezium.spi.common.Configurable;
/**
* This interface is used to determine the table lock mode used during schema snapshot
* {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot
*
* @author Mario Fiore Vitale
*/
public interface SnapshotLock {
public interface SnapshotLock extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter

View File

@ -3,16 +3,27 @@
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
package io.debezium.snapshot.spi;
import java.util.List;
import java.util.Optional;
import io.debezium.service.Service;
import io.debezium.spi.common.Configurable;
/**
* This interface is used to determine the query used during data snapshot
* {@link SnapshotQuery} is used to determine the query used during data snapshot
*
* @author Mario Fiore Vitale
*/
public interface SnapshotQuery {
public interface SnapshotQuery extends Configurable, Service {
/**
* @return the name of the snapshot lock.
*
*
*/
String name();
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}