From 3b92786a41a57b640a2151b66e55d2f810d7082d Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 22 Jan 2024 11:22:59 +0100 Subject: [PATCH] DBZ-7300 Snapshotter, SnapshotLock and SnapshotQuery are now services registered in the ServiceRegistry --- .../io/debezium/spi/common/Configurable.java | 24 ++ .../io/debezium/spi/snapshot/Snapshotter.java | 53 ++-- .../spi/snapshot/SnapshotterProvider.java | 29 --- .../PostgresChangeEventSourceCoordinator.java | 10 +- .../PostgresChangeEventSourceFactory.java | 12 +- .../postgresql/PostgresConnector.java | 16 -- .../postgresql/PostgresConnectorConfig.java | 234 +++++++++++++++--- .../postgresql/PostgresConnectorTask.java | 59 ++++- .../PostgresSnapshotChangeEventSource.java | 54 ++-- .../PostgresStreamingChangeEventSource.java | 12 +- .../snapshot/AlwaysSnapshotter.java | 25 -- .../snapshot/InitialSnapshotter.java | 47 ---- .../PostgresSnapshotterServiceProvider.java | 21 ++ .../snapshot/QueryingSnapshotter.java | 52 ---- .../snapshot/SnapshotLockProvider.java | 70 ++++++ .../snapshot/SnapshotQueryProvider.java | 69 ++++++ .../snapshot/SnapshotterWrapper.java | 35 --- .../snapshot/lock/NoSnapshotLock.java | 33 +++ .../snapshot/lock/SharedSnapshotLock.java | 44 ++++ .../snapshot/mode/AlwaysSnapshotter.java | 60 +++++ .../{ => mode}/InitialOnlySnapshotter.java | 17 +- .../snapshot/mode/InitialSnapshotter.java | 76 ++++++ .../snapshot/{ => mode}/NeverSnapshotter.java | 39 ++- .../query/SelectAllSnapshotQuery.java | 34 +++ .../io.debezium.snapshot.spi.SnapshotLock | 2 + .../io.debezium.snapshot.spi.SnapshotQuery | 1 + .../io.debezium.spi.snapshot.Snapshotter | 4 + .../CustomLifecycleHookTestSnapshot.java | 7 +- .../CustomPartialTableTestSnapshot.java | 52 +++- .../CustomStartFromStreamingTestSnapshot.java | 41 ++- .../postgresql/CustomTestSnapshot.java | 62 +++-- .../postgresql/PostgresConnectorIT.java | 29 ++- .../postgresql/RecordsSnapshotProducerIT.java | 4 +- .../io.debezium.snapshot.spi.SnapshotQuery | 3 + .../io.debezium.spi.snapshot.Snapshotter | 4 + .../io/debezium/bean/StandardBeanNames.java | 6 + .../config/CommonConnectorConfig.java | 25 ++ .../debezium/snapshot/SnapshotterService.java | 48 ++++ .../snapshot/SnapshotterServiceProvider.java | 75 ++++++ .../debezium/snapshot/spi}/SnapshotLock.java | 17 +- .../debezium/snapshot/spi}/SnapshotQuery.java | 17 +- 41 files changed, 1149 insertions(+), 373 deletions(-) create mode 100644 debezium-api/src/main/java/io/debezium/spi/common/Configurable.java delete mode 100644 debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java delete mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/AlwaysSnapshotter.java delete mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialSnapshotter.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/PostgresSnapshotterServiceProvider.java delete mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotLockProvider.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotQueryProvider.java delete mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotterWrapper.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/NoSnapshotLock.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/SharedSnapshotLock.java create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/AlwaysSnapshotter.java rename debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/{ => mode}/InitialOnlySnapshotter.java (62%) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialSnapshotter.java rename debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/{ => mode}/NeverSnapshotter.java (61%) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/query/SelectAllSnapshotQuery.java create mode 100644 debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotLock create mode 100644 debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery create mode 100644 debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter create mode 100644 debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery create mode 100644 debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter create mode 100644 debezium-core/src/main/java/io/debezium/snapshot/SnapshotterService.java create mode 100644 debezium-core/src/main/java/io/debezium/snapshot/SnapshotterServiceProvider.java rename {debezium-api/src/main/java/io/debezium/spi/snapshot => debezium-core/src/main/java/io/debezium/snapshot/spi}/SnapshotLock.java (55%) rename {debezium-api/src/main/java/io/debezium/spi/snapshot => debezium-core/src/main/java/io/debezium/snapshot/spi}/SnapshotQuery.java (67%) diff --git a/debezium-api/src/main/java/io/debezium/spi/common/Configurable.java b/debezium-api/src/main/java/io/debezium/spi/common/Configurable.java new file mode 100644 index 000000000..650043642 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/spi/common/Configurable.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.spi.common; + +import java.util.Map; + +/** + * A contract that defines a configurable interface + * + * @author Mario Fiore Vitale + * + */ +public interface Configurable { + + /** + * Connector properties are passed to let you configure your implementation. + * + * @param properties map of configurable properties + */ + void configure(Map properties); +} diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java index 0328df5a4..368f7ee78 100644 --- a/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java +++ b/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java @@ -5,55 +5,74 @@ */ package io.debezium.spi.snapshot; +import io.debezium.DebeziumException; import io.debezium.common.annotation.Incubating; +import io.debezium.spi.common.Configurable; /** - * This interface is used to determine details about the snapshot process: - * - * Namely: - * - Should a snapshot occur at all - * - Should streaming occur - * - What queries should be used to snapshot - * + * {@link Snapshotter} is used to determine details about the snapshot process: + *

+ * Namely:
+ * - Should a snapshot occur at all
+ * - Should streaming occur
+ * - Should snapshot schema (if supported)
+ * - Should snapshot data/schema on error + *

* While many default snapshot modes are provided with debezium (see documentation for details) * a custom implementation of this interface can be provided by the implementor which * can provide more advanced functionality, such as partial snapshots * - * Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()} - * or true for both. + * + * @author Mario Fiore Vitale */ @Incubating -public interface Snapshotter { +public interface Snapshotter extends Configurable { /** - * @return true if the snapshotter should take a snapshot + * @return the name of the snapshotter. + * + * + */ + String name(); + + /** + * Validate the snapshotter compatibility with the current connector configuration. + * Throws a {@link DebeziumException} in case it is not compatible. + * + * @param offsetContextExists is {@code true} when the connector has an offset context (i.e. restarted) + * @param isSnapshotInProgress is {@code true} when the connector is started but there was already a snapshot in progress + */ + void validate(boolean offsetContextExists, boolean isSnapshotInProgress); + + /** + * @return {@code true} if the snapshotter should take a snapshot */ boolean shouldSnapshot(); /** - * @return true if the snapshotter should take a snapshot + * @return {@code true} if the snapshotter should take a snapshot */ boolean shouldSnapshotSchema(); /** - * @return true if the snapshotter should stream after taking a snapshot + * @return {@code true} if the snapshotter should stream after taking a snapshot */ boolean shouldStream(); /** - * @return true whether the schema can be recovered if database schema history is corrupted. + * @return {@code true} whether the schema can be recovered if database schema history is corrupted. */ boolean shouldSnapshotOnSchemaError(); /** - * @return true whether the snapshot should be re-executed when there is a gap in data stream. + * @return {@code true} whether the snapshot should be re-executed when there is a gap in data stream. */ boolean shouldSnapshotOnDataError(); /** * - * @return true if streaming should resume from the start of the snapshot - * transaction, or false for when a connector resumes and takes a snapshot, + * @return {@code true} if streaming should resume from the start of the snapshot + * transaction, or {@code false} for when a connector resumes and takes a snapshot, * streaming should resume from where streaming previously left off. */ default boolean shouldStreamEventsStartingFromSnapshot() { diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java deleted file mode 100644 index 064ebb369..000000000 --- a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.spi.snapshot; - -import java.util.List; -import java.util.Map; - -/** - * This interface is used to provide custom snapshotters: - * Implementations must: - * - * provide a map of snapshotter in the {@link #create(Configuration config)} method. - * - * @author Mario Fiore Vitale - */ -public interface SnapshotterProvider { - - /** - * Create a map of snapshotter where the key is its name used in 'snapshot.mode' configuration. - * - * @param Configuration the connector configuration - * - * @return a map of custom snapshotter - */ - Map create(Configuration config); // Can we move the Configuration interface from core module? -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java index e50c1f449..8a4598b5d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceCoordinator.java @@ -14,7 +14,6 @@ import io.debezium.DebeziumException; import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.pipeline.ChangeEventSourceCoordinator; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.EventDispatcher; @@ -26,6 +25,7 @@ import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; import io.debezium.pipeline.spi.Offsets; import io.debezium.schema.DatabaseSchema; +import io.debezium.snapshot.SnapshotterService; /** * Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base @@ -35,7 +35,7 @@ public class PostgresChangeEventSourceCoordinator extends ChangeEventSourceCoord private static final Logger LOGGER = LoggerFactory.getLogger(PostgresChangeEventSourceCoordinator.class); - private final Snapshotter snapshotter; + private final SnapshotterService snapshotterService; private final SlotState slotInfo; public PostgresChangeEventSourceCoordinator(Offsets previousOffsets, @@ -45,12 +45,12 @@ public PostgresChangeEventSourceCoordinator(Offsets changeEventSourceMetricsFactory, EventDispatcher eventDispatcher, DatabaseSchema schema, - Snapshotter snapshotter, SlotState slotInfo, + SnapshotterService snapshotterService, SlotState slotInfo, SignalProcessor signalProcessor, NotificationService notificationService) { super(previousOffsets, errorHandler, connectorType, connectorConfig, changeEventSourceFactory, changeEventSourceMetricsFactory, eventDispatcher, schema, signalProcessor, notificationService); - this.snapshotter = snapshotter; + this.snapshotterService = snapshotterService; this.slotInfo = slotInfo; } @@ -60,7 +60,7 @@ protected CatchUpStreamingResult executeCatchUpStreaming(ChangeEventSourceContex PostgresPartition partition, PostgresOffsetContext previousOffset) throws InterruptedException { - if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot() && slotInfo != null) { + if (previousOffset != null && !snapshotterService.getSnapshotter().shouldStreamEventsStartingFromSnapshot() && slotInfo != null) { try { setSnapshotStartLsn((PostgresSnapshotChangeEventSource) snapshotSource, previousOffset); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java index 024892ead..00c950444 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresChangeEventSourceFactory.java @@ -11,7 +11,6 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.spi.SlotCreationResult; import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.notification.NotificationService; @@ -22,6 +21,7 @@ import io.debezium.pipeline.source.spi.SnapshotProgressListener; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.snapshot.SnapshotterService; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Clock; import io.debezium.util.Strings; @@ -35,12 +35,12 @@ public class PostgresChangeEventSourceFactory implements ChangeEventSourceFactor private final Clock clock; private final PostgresSchema schema; private final PostgresTaskContext taskContext; - private final Snapshotter snapshotter; + private final SnapshotterService snapshotterService; private final ReplicationConnection replicationConnection; private final SlotCreationResult slotCreatedInfo; private final SlotState startingSlotInfo; - public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, Snapshotter snapshotter, + public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, SnapshotterService snapshotterService, MainConnectionProvidingConnectionFactory connectionFactory, ErrorHandler errorHandler, PostgresEventDispatcher dispatcher, Clock clock, PostgresSchema schema, PostgresTaskContext taskContext, ReplicationConnection replicationConnection, SlotCreationResult slotCreatedInfo, @@ -52,7 +52,7 @@ public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, S this.clock = clock; this.schema = schema; this.taskContext = taskContext; - this.snapshotter = snapshotter; + this.snapshotterService = snapshotterService; this.replicationConnection = replicationConnection; this.slotCreatedInfo = slotCreatedInfo; this.startingSlotInfo = startingSlotInfo; @@ -63,7 +63,7 @@ public SnapshotChangeEventSource getSn NotificationService notificationService) { return new PostgresSnapshotChangeEventSource( configuration, - snapshotter, + snapshotterService, connectionFactory, schema, dispatcher, @@ -78,7 +78,7 @@ public SnapshotChangeEventSource getSn public StreamingChangeEventSource getStreamingChangeEventSource() { return new PostgresStreamingChangeEventSource( configuration, - snapshotter, + snapshotterService, connectionFactory.mainConnection(), dispatcher, errorHandler, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnector.java index 5e942a3f5..526085574 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnector.java @@ -93,7 +93,6 @@ protected void validateConnection(Map configValues, Configu // Prepare connection without initial statement execution connection.connection(false); testConnection(connection); - checkWalLevel(connection, postgresConfig); checkLoginReplicationRoles(connection); } catch (SQLException e) { @@ -143,21 +142,6 @@ private static void checkLoginReplicationRoles(PostgresConnection connection) th } } - private static void checkWalLevel(PostgresConnection connection, PostgresConnectorConfig config) throws SQLException { - final String walLevel = connection.queryAndMap( - "SHOW wal_level", - connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level")); - if (!"logical".equals(walLevel)) { - if (config.getSnapshotter() != null && config.getSnapshotter().shouldStream()) { - // Logical WAL_LEVEL is only necessary for CDC snapshotting - throw new SQLException("Postgres server wal_level property must be 'logical' but is: '" + walLevel + "'"); - } - else { - LOGGER.warn("WAL_LEVEL check failed but this is ignored as CDC was not requested"); - } - } - } - private static void testConnection(PostgresConnection connection) throws SQLException { connection.execute("SELECT version()"); LOGGER.info("Successfully tested connection for {} with user '{}'", connection.connectionString(), diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index b09456ead..5c2cd6968 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -33,11 +33,6 @@ import io.debezium.connector.postgresql.connection.ReplicationConnection; import io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder; import io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder; -import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; -import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; -import io.debezium.connector.postgresql.snapshot.InitialSnapshotter; -import io.debezium.connector.postgresql.snapshot.NeverSnapshotter; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.jdbc.JdbcConfiguration; import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.RelationalDatabaseConnectorConfig; @@ -184,45 +179,33 @@ public enum SnapshotMode implements EnumeratedValue { /** * Always perform a snapshot when starting. */ - ALWAYS("always", (c) -> new AlwaysSnapshotter()), + ALWAYS("always"), /** * Perform a snapshot only upon initial startup of a connector. */ - INITIAL("initial", (c) -> new InitialSnapshotter()), + INITIAL("initial"), /** * Never perform a snapshot and only receive logical changes. */ - NEVER("never", (c) -> new NeverSnapshotter()), + NEVER("never"), /** * Perform a snapshot and then stop before attempting to receive any logical changes. */ - INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()), + INITIAL_ONLY("initial_only"), /** * Inject a custom snapshotter, which allows for more control over snapshots. */ - CUSTOM("custom", (c) -> { - return c.getInstance(SNAPSHOT_MODE_CLASS, Snapshotter.class); - }); - - @FunctionalInterface - public interface SnapshotterBuilder { - Snapshotter buildSnapshotter(Configuration config); - } + CUSTOM("custom"); private final String value; - private final SnapshotterBuilder builderFunc; - SnapshotMode(String value, SnapshotterBuilder buildSnapshotter) { + SnapshotMode(String value) { this.value = value; - this.builderFunc = buildSnapshotter; - } - public Snapshotter getSnapshotter(Configuration config) { - return builderFunc.buildSnapshotter(config); } @Override @@ -482,6 +465,121 @@ public static SchemaRefreshMode parse(String value) { } } + public enum SnapshotLockingMode implements EnumeratedValue { + /** + * This mode will lock in ACCESS SHARE MODE to avoid concurrent schema changes during the snapshot, and + * this does not prevent writes to the table, but prevents changes to the table's schema. + */ + SHARED("shared"), + + /** + * This mode will avoid using ANY table locks during the snapshot process. + * This mode should be used carefully only when no schema changes are to occur. + */ + NONE("none"), + + CUSTOM("custom"); + + private final String value; + + SnapshotLockingMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be {@code null} + * @return the matching option, or null if no match is found + */ + public static SnapshotLockingMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (SnapshotLockingMode option : SnapshotLockingMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be {@code null} + * @param defaultValue the default value; may be {@code null} + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static SnapshotLockingMode parse(String value, String defaultValue) { + SnapshotLockingMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + + public enum SnapshotQueryMode implements EnumeratedValue { + /** + * This mode will do a select based on {@code column.include.list} and {@code column.exclude.list} configurations. + */ + SELECT_ALL("select_all"), + + CUSTOM("custom"); + + private final String value; + + SnapshotQueryMode(String value) { + this.value = value; + } + + @Override + public String getValue() { + return value; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be {@code null} + * @return the matching option, or null if no match is found + */ + public static SnapshotQueryMode parse(String value) { + if (value == null) { + return null; + } + value = value.trim(); + for (SnapshotQueryMode option : SnapshotQueryMode.values()) { + if (option.getValue().equalsIgnoreCase(value)) { + return option; + } + } + return null; + } + + /** + * Determine if the supplied value is one of the predefined options. + * + * @param value the configuration property value; may not be {@code null} + * @param defaultValue the default value; may be {@code null} + * @return the matching option, or null if no match is found and the non-null default is invalid + */ + public static SnapshotQueryMode parse(String value, String defaultValue) { + SnapshotQueryMode mode = parse(value); + if (mode == null && defaultValue != null) { + mode = parse(defaultValue); + } + return mode; + } + } + protected static final String DATABASE_CONFIG_PREFIX = "database."; protected static final int DEFAULT_PORT = 5_432; protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; @@ -754,22 +852,60 @@ public static AutoCreateMode parse(String value, String defaultValue) { + "'exported': This option is deprecated; use 'initial' instead.; " + "'custom': The connector loads a custom class to specify how the connector performs snapshots. For more information, see Custom snapshotter SPI in the PostgreSQL connector documentation."); - public static final Field SNAPSHOT_MODE_CLASS = Field.create("snapshot.custom.class") - .withDisplayName("Snapshot Mode Custom Class") + public static final Field SNAPSHOT_LOCKING_MODE = Field.create("snapshot.locking.mode") + .withDisplayName("Snapshot locking mode") + .withEnum(SnapshotLockingMode.class, SnapshotLockingMode.NONE) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 12)) + .withDescription("Controls how the connector holds locks on tables while performing the schema snapshot. The 'shared' " + + "which means the connector will hold a table lock that prevents exclusive table access for just the initial portion of the snapshot " + + "while the database schemas and other metadata are being read. The remaining work in a snapshot involves selecting all rows from " + + "each table, and this is done using a flashback query that requires no locks. However, in some cases it may be desirable to avoid " + + "locks entirely which can be done by specifying 'none'. This mode is only safe to use if no schema changes are happening while the " + + "snapshot is taken."); + + public static final Field SNAPSHOT_LOCKING_MODE_CUSTOM_NAME = Field.create("snapshot.locking.mode.custom.name") + .withDisplayName("Snapshot Locking Mode Custom Name") .withType(Type.STRING) - .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 9)) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 13)) .withWidth(Width.MEDIUM) .withImportance(Importance.MEDIUM) .withValidation((config, field, output) -> { - if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) { - output.accept(field, "", "snapshot.custom_class cannot be empty when snapshot.mode 'custom' is defined"); + if (config.getString(SNAPSHOT_LOCKING_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) { + output.accept(field, "", "snapshot.locking.mode.custom.name cannot be empty when snapshot.locking.mode 'custom' is defined"); return 1; } return 0; }) .withDescription( - "When 'snapshot.mode' is set as custom, this setting must be set to specify a fully qualified class name to load (via the default class loader). " - + "This class must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot and how to build queries."); + "When 'snapshot.locking.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. " + + "The implementations must implement the 'SnapshotterLocking' interface and is called to determine how to lock tables during schema snapshot."); + + public static final Field SNAPSHOT_QUERY_MODE = Field.create("snapshot.query.mode") + .withDisplayName("Snapshot query mode") + .withEnum(SnapshotQueryMode.class, SnapshotQueryMode.SELECT_ALL) + .withWidth(Width.SHORT) + .withImportance(Importance.LOW) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 14)) + .withDescription("Controls query used during the snapshot"); + + public static final Field SNAPSHOT_QUERY_MODE_CUSTOM_NAME = Field.create("snapshot.query.mode.custom.name") + .withDisplayName("Snapshot Query Mode Custom Name") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 15)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withValidation((config, field, output) -> { + if (config.getString(SNAPSHOT_QUERY_MODE).equalsIgnoreCase("custom") && config.getString(field, "").isEmpty()) { + output.accept(field, "", "snapshot.query.mode.custom.name cannot be empty when snapshot.query.mode 'custom' is defined"); + return 1; + } + return 0; + }) + .withDescription( + "When 'snapshot.query.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. " + + "The implementations must implement the 'SnapshotterQuery' interface and is called to determine how to build queries during snapshot."); /** * A comma-separated list of regular expressions that match the prefix of logical decoding messages to be excluded @@ -909,11 +1045,16 @@ public static AutoCreateMode parse(String value, String defaultValue) { private final LogicalDecodingMessageFilter logicalDecodingMessageFilter; private final HStoreHandlingMode hStoreHandlingMode; private final IntervalHandlingMode intervalHandlingMode; - private final SnapshotMode snapshotMode; private final SchemaRefreshMode schemaRefreshMode; private final boolean flushLsnOnSource; private final ReplicaIdentityMapper replicaIdentityMapper; + private final SnapshotMode snapshotMode; + private final SnapshotLockingMode snapshotLockingMode; + private final SnapshotQueryMode snapshotQueryMode; + private final String snapshotQueryModeCustomName; + private final String snapshotLockingModeCustomName; + public PostgresConnectorConfig(Configuration config) { super( config, @@ -928,11 +1069,15 @@ public PostgresConnectorConfig(Configuration config) { String hstoreHandlingModeStr = config.getString(PostgresConnectorConfig.HSTORE_HANDLING_MODE); this.hStoreHandlingMode = HStoreHandlingMode.parse(hstoreHandlingModeStr); this.intervalHandlingMode = IntervalHandlingMode.parse(config.getString(PostgresConnectorConfig.INTERVAL_HANDLING_MODE)); - this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE)); this.schemaRefreshMode = SchemaRefreshMode.parse(config.getString(SCHEMA_REFRESH_MODE)); this.flushLsnOnSource = config.getBoolean(SHOULD_FLUSH_LSN_IN_SOURCE_DB); final var replicaIdentityMapping = config.getString(REPLICA_IDENTITY_AUTOSET_VALUES); this.replicaIdentityMapper = (replicaIdentityMapping != null) ? new ReplicaIdentityMapper(replicaIdentityMapping) : null; + this.snapshotMode = SnapshotMode.parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); + this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); + this.snapshotQueryMode = SnapshotQueryMode.parse(config.getString(SNAPSHOT_QUERY_MODE), SNAPSHOT_QUERY_MODE.defaultValueAsString()); + this.snapshotQueryModeCustomName = config.getString(SNAPSHOT_QUERY_MODE_CUSTOM_NAME, ""); + this.snapshotLockingModeCustomName = config.getString(SNAPSHOT_LOCKING_MODE_CUSTOM_NAME, ""); } protected String hostname() { @@ -1011,10 +1156,6 @@ public Map validate() { return getConfig().validate(ALL_FIELDS); } - protected Snapshotter getSnapshotter() { - return this.snapshotMode.getSnapshotter(getConfig()); - } - protected boolean skipRefreshSchemaOnMissingToastableData() { return SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST == this.schemaRefreshMode; } @@ -1040,6 +1181,26 @@ public Optional replicaIdentityMapper() { return Optional.ofNullable(this.replicaIdentityMapper); } + public SnapshotMode snapshotMode() { + return this.snapshotMode; + } + + public SnapshotLockingMode snapshotLockingMode() { + return this.snapshotLockingMode; + } + + public SnapshotQueryMode snapshotQueryMode() { + return this.snapshotQueryMode; + } + + public String snapshotQueryModeCustomName() { + return this.snapshotQueryModeCustomName; + } + + public String snapshotLockingModeCustomName() { + return this.snapshotLockingModeCustomName; + } + protected int moneyFractionDigits() { return getConfig().getInteger(MONEY_FRACTION_DIGITS); } @@ -1085,7 +1246,6 @@ protected SourceInfoStructMaker getSourceInfoStruc SOURCE_INFO_STRUCT_MAKER) .connector( SNAPSHOT_MODE, - SNAPSHOT_MODE_CLASS, HSTORE_HANDLING_MODE, BINARY_HANDLING_MODE, SCHEMA_NAME_ADJUSTMENT_MODE, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java index cab980e65..3f9ea6460 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorTask.java @@ -30,9 +30,11 @@ import io.debezium.connector.postgresql.connection.PostgresConnection.PostgresValueConverterBuilder; import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter; import io.debezium.connector.postgresql.connection.ReplicationConnection; +import io.debezium.connector.postgresql.snapshot.PostgresSnapshotterServiceProvider; +import io.debezium.connector.postgresql.snapshot.SnapshotLockProvider; +import io.debezium.connector.postgresql.snapshot.SnapshotQueryProvider; import io.debezium.connector.postgresql.spi.SlotCreationResult; import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.document.DocumentReader; import io.debezium.jdbc.DefaultMainConnectionProvidingConnectionFactory; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; @@ -46,6 +48,9 @@ import io.debezium.relational.TableId; import io.debezium.schema.SchemaFactory; import io.debezium.schema.SchemaNameAdjuster; +import io.debezium.service.spi.ServiceRegistry; +import io.debezium.snapshot.SnapshotterService; +import io.debezium.spi.snapshot.Snapshotter; import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; @@ -72,15 +77,11 @@ public class PostgresConnectorTask extends BaseSourceTask start(Configuration config) { + final PostgresConnectorConfig connectorConfig = new PostgresConnectorConfig(config); final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY); - final Snapshotter snapshotter = connectorConfig.getSnapshotter(); final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); - if (snapshotter == null) { - throw new ConnectException("Unable to load snapshotter, if using custom snapshot mode, double check your settings"); - } - final Charset databaseCharset; try (PostgresConnection tempConnection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) { databaseCharset = tempConnection.getDatabaseCharset(); @@ -121,10 +122,23 @@ public ChangeEventSourceCoordinator st connectorConfig.getBeanRegistry().add(StandardBeanNames.DATABASE_SCHEMA, schema); connectorConfig.getBeanRegistry().add(StandardBeanNames.JDBC_CONNECTION, beanRegistryJdbcConnection); connectorConfig.getBeanRegistry().add(StandardBeanNames.VALUE_CONVERTER, valueConverter); + connectorConfig.getBeanRegistry().add(StandardBeanNames.OFFSETS, previousOffsets); // Service providers registerServiceProviders(connectorConfig.getServiceRegistry()); + final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class); + final Snapshotter snapshotter = snapshotterService.getSnapshotter(); + + try { + checkWalLevel(beanRegistryJdbcConnection, snapshotterService); + } + catch (SQLException e) { + + LOGGER.error("Failed testing connection for {} with user '{}'", beanRegistryJdbcConnection.connectionString(), + beanRegistryJdbcConnection.username(), e); + } + LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME); try { // Print out the server information @@ -142,11 +156,11 @@ public ChangeEventSourceCoordinator st if (previousOffset == null) { LOGGER.info("No previous offset found"); // if we have no initial offset, indicate that to Snapshotter by passing null - snapshotter.init(connectorConfig, null, slotInfo); + snapshotter.validate(false, false); } else { LOGGER.info("Found previous offset {}", previousOffset); - snapshotter.init(connectorConfig, previousOffset.asOffsetState(), slotInfo); + snapshotter.validate(true, previousOffset.isSnapshotRunning()); } SlotCreationResult slotCreatedInfo = null; @@ -237,7 +251,7 @@ public ChangeEventSourceCoordinator st connectorConfig, new PostgresChangeEventSourceFactory( connectorConfig, - snapshotter, + snapshotterService, connectionFactory, errorHandler, dispatcher, @@ -250,7 +264,7 @@ public ChangeEventSourceCoordinator st new DefaultChangeEventSourceMetricsFactory<>(), dispatcher, schema, - snapshotter, + snapshotterService, slotInfo, signalProcessor, notificationService); @@ -294,6 +308,15 @@ public ReplicationConnection createReplicationConnection(PostgresTaskContext tas return replicationConnection; } + @Override + protected void registerServiceProviders(ServiceRegistry serviceRegistry) { + + super.registerServiceProviders(serviceRegistry); + serviceRegistry.registerServiceProvider(new SnapshotLockProvider()); + serviceRegistry.registerServiceProvider(new SnapshotQueryProvider()); + serviceRegistry.registerServiceProvider(new PostgresSnapshotterServiceProvider()); + } + @Override public List doPoll() throws InterruptedException { final List records = queue.poll(); @@ -351,4 +374,20 @@ protected Iterable getAllConfigurationFields() { public PostgresTaskContext getTaskContext() { return taskContext; } + + private static void checkWalLevel(PostgresConnection connection, SnapshotterService snapshotterService) throws SQLException { + final String walLevel = connection.queryAndMap( + "SHOW wal_level", + connection.singleResultMapper(rs -> rs.getString("wal_level"), "Could not fetch wal_level")); + if (!"logical".equals(walLevel)) { + // TODO here I don't have the snapshotter, it is not yet injected + if (snapshotterService.getSnapshotter() != null && snapshotterService.getSnapshotter().shouldStream()) { + // Logical WAL_LEVEL is only necessary for CDC snapshotting + throw new SQLException("Postgres server wal_level property must be 'logical' but is: '" + walLevel + "'"); + } + else { + LOGGER.warn("WAL_LEVEL check failed but this is ignored as CDC was not requested"); + } + } + } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index f6d15af03..a6f94bad9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -19,10 +19,9 @@ import io.debezium.connector.postgresql.PostgresOffsetContext.Loader; import io.debezium.connector.postgresql.connection.Lsn; import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; +import io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter; import io.debezium.connector.postgresql.spi.SlotCreationResult; import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.notification.NotificationService; @@ -33,6 +32,8 @@ import io.debezium.relational.TableId; import io.debezium.relational.Tables; import io.debezium.schema.SchemaChangeEvent; +import io.debezium.snapshot.SnapshotterService; +import io.debezium.spi.snapshot.Snapshotter; import io.debezium.util.Clock; public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { @@ -42,12 +43,12 @@ public class PostgresSnapshotChangeEventSource extends RelationalSnapshotChangeE private final PostgresConnectorConfig connectorConfig; private final PostgresConnection jdbcConnection; private final PostgresSchema schema; - private final Snapshotter snapshotter; + private final SnapshotterService snapshotterService; private final Snapshotter blockingSnapshotter; private final SlotCreationResult slotCreatedInfo; private final SlotState startingSlotInfo; - public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, + public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig, SnapshotterService snapshotterService, MainConnectionProvidingConnectionFactory connectionFactory, PostgresSchema schema, EventDispatcher dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener, SlotCreationResult slotCreatedInfo, @@ -56,7 +57,7 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig this.connectorConfig = connectorConfig; this.jdbcConnection = connectionFactory.mainConnection(); this.schema = schema; - this.snapshotter = snapshotter; + this.snapshotterService = snapshotterService; this.slotCreatedInfo = slotCreatedInfo; this.startingSlotInfo = startingSlotInfo; this.blockingSnapshotter = new AlwaysSnapshotter(); @@ -71,7 +72,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); - boolean snapshotData = snapshotter.shouldSnapshot(); + boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshot(); if (snapshotData) { LOGGER.info("According to the connector configuration data will be snapshotted"); } @@ -97,7 +98,7 @@ protected void connectionCreated(RelationalSnapshotContext getAllTableIds(RelationalSnapshotContext snapshotContext) throws SQLException { + final Duration lockTimeout = connectorConfig.snapshotLockTimeout(); - final Optional lockStatement = snapshotter.snapshotTableLockingStatement(lockTimeout, snapshotContext.capturedTables); + final Set capturedTablesNames = snapshotContext.capturedTables.stream().map(TableId::toDoubleQuotedString).collect(Collectors.toSet()); + final Optional lockStatement = snapshotterService.getSnapshotLock().tableLockingStatement(lockTimeout, capturedTablesNames); if (lockStatement.isPresent()) { LOGGER.info("Waiting a maximum of '{}' seconds for each table lock", lockTimeout.getSeconds()); @@ -134,7 +137,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext snapshotContext, - Table table) - throws SQLException { + Table table) { return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); } @Override protected void completed(SnapshotContext snapshotContext) { - snapshotter.snapshotCompleted(); + snapshotterService.getSnapshotter().snapshotCompleted(); } @Override protected void aborted(SnapshotContext snapshotContext) { - snapshotter.snapshotAborted(); + snapshotterService.getSnapshotter().snapshotAborted(); } /** @@ -242,20 +244,34 @@ protected void aborted(SnapshotContext @Override protected Optional getSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId, List columns) { - if (snapshotContext.onDemand) { - return blockingSnapshotter.buildSnapshotQuery(tableId, columns); - } - return snapshotter.buildSnapshotQuery(tableId, columns); + return snapshotterService.getSnapshotQuery().snapshotQuery(tableId.toDoubleQuotedString(), columns); } protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws SQLException { LOGGER.info("Setting isolation level"); - String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand); + String transactionStatement = snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand); LOGGER.info("Opening transaction with statement {}", transactionStatement); jdbcConnection.executeWithoutCommitting(transactionStatement); } + private String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { + + if (newSlotInfo != null && !isOnDemand) { + /* + * For an on demand blocking snapshot we don't need to reuse + * the same snapshot from the existing exported transaction as for the initial snapshot. + */ + String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); + return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; + } + + // TODO should this customizable? + + // we're using the same isolation level that pg_backup uses + return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; + } + /** * Mutable context which is populated in the course of snapshotting. */ diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 901b25f8f..c199217ea 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -25,11 +25,11 @@ import io.debezium.connector.postgresql.connection.ReplicationMessage.Operation; import io.debezium.connector.postgresql.connection.ReplicationStream; import io.debezium.connector.postgresql.connection.WalPositionLocator; -import io.debezium.connector.postgresql.spi.Snapshotter; import io.debezium.heartbeat.Heartbeat; import io.debezium.pipeline.ErrorHandler; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.snapshot.SnapshotterService; import io.debezium.util.Clock; import io.debezium.util.DelayStrategy; import io.debezium.util.ElapsedTimeStrategy; @@ -64,7 +64,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS private final PostgresTaskContext taskContext; private final ReplicationConnection replicationConnection; private final AtomicReference replicationStream = new AtomicReference<>(); - private final Snapshotter snapshotter; + private final SnapshotterService snapshotterService; private final DelayStrategy pauseNoMessage; private final ElapsedTimeStrategy connectionProbeTimer; @@ -82,7 +82,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS private Lsn lastCompletelyProcessedLsn; private PostgresOffsetContext effectiveOffset; - public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, + public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, SnapshotterService snapshotterService, PostgresConnection connection, PostgresEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, PostgresSchema schema, PostgresTaskContext taskContext, ReplicationConnection replicationConnection) { this.connectorConfig = connectorConfig; @@ -93,7 +93,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.schema = schema; pauseNoMessage = DelayStrategy.constant(taskContext.getConfig().getPollInterval()); this.taskContext = taskContext; - this.snapshotter = snapshotter; + this.snapshotterService = snapshotterService; this.replicationConnection = replicationConnection; this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); @@ -119,8 +119,8 @@ private void initSchema() { @Override public void execute(ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext offsetContext) throws InterruptedException { - if (!snapshotter.shouldStream()) { - LOGGER.info("Streaming is not enabled in correct configuration"); + if (!snapshotterService.getSnapshotter().shouldStream()) { + LOGGER.info("Streaming is not enabled in configuration"); return; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/AlwaysSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/AlwaysSnapshotter.java deleted file mode 100644 index 792d6d09a..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/AlwaysSnapshotter.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.snapshot; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AlwaysSnapshotter extends QueryingSnapshotter { - - private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class); - - @Override - public boolean shouldStream() { - return true; - } - - @Override - public boolean shouldSnapshot() { - LOGGER.info("Taking a new snapshot as per configuration"); - return true; - } -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialSnapshotter.java deleted file mode 100644 index b2b45874d..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialSnapshotter.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.snapshot; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotState; - -public class InitialSnapshotter extends QueryingSnapshotter { - - private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class); - private OffsetState sourceInfo; - - @Override - public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - super.init(config, sourceInfo, slotState); - this.sourceInfo = sourceInfo; - } - - @Override - public boolean shouldStream() { - return true; - } - - @Override - public boolean shouldSnapshot() { - if (sourceInfo == null) { - LOGGER.info("Taking initial snapshot for new datasource"); - return true; - } - else if (sourceInfo.snapshotInEffect()) { - LOGGER.info("Found previous incomplete snapshot"); - return true; - } - else { - LOGGER.info( - "Previous snapshot has completed successfully, streaming logical changes from last known position"); - return false; - } - } -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/PostgresSnapshotterServiceProvider.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/PostgresSnapshotterServiceProvider.java new file mode 100644 index 000000000..1594ef112 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/PostgresSnapshotterServiceProvider.java @@ -0,0 +1,21 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot; + +import io.debezium.bean.StandardBeanNames; +import io.debezium.bean.spi.BeanRegistry; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.snapshot.SnapshotterServiceProvider; + +public class PostgresSnapshotterServiceProvider extends SnapshotterServiceProvider { + @Override + public String snapshotMode(BeanRegistry beanRegistry) { + + PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class); + + return postgresConnectorConfig.snapshotMode().getValue(); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java deleted file mode 100644 index ff7cc09af..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/QueryingSnapshotter.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.snapshot; - -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; - -import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotCreationResult; -import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; -import io.debezium.relational.TableId; - -public abstract class QueryingSnapshotter implements Snapshotter { - - @Override - public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - } - - @Override - public Optional buildSnapshotQuery(TableId tableId, List snapshotSelectColumns) { - String query = snapshotSelectColumns.stream() - .collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId.toDoubleQuotedString())); - - return Optional.of(query); - } - - @Override - public Optional snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { - return Optional.empty(); - } - - @Override - public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { - if (newSlotInfo != null && !isOnDemand) { - /* - * For an on demand blocking snapshot we don't need to reuse - * the same snapshot from the existing exported transaction as for the initial snapshot. - */ - String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); - return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; - } - return Snapshotter.super.snapshotTransactionIsolationLevelStatement(newSlotInfo, isOnDemand); - } -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotLockProvider.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotLockProvider.java new file mode 100644 index 000000000..332209fdd --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotLockProvider.java @@ -0,0 +1,70 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot; + +import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotLockingMode.CUSTOM; + +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.stream.StreamSupport; + +import io.debezium.DebeziumException; +import io.debezium.bean.StandardBeanNames; +import io.debezium.bean.spi.BeanRegistry; +import io.debezium.bean.spi.BeanRegistryAware; +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotLockingMode; +import io.debezium.service.spi.ServiceProvider; +import io.debezium.service.spi.ServiceRegistry; +import io.debezium.snapshot.spi.SnapshotLock; + +/** + * An implementation of the {@link ServiceProvider} contract for the {@link SnapshotLock}. + * + * @author Mario Fiore Vitale + */ +public class SnapshotLockProvider implements ServiceProvider { + + @Override + public SnapshotLock createService(Configuration configuration, ServiceRegistry serviceRegistry) { + + BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class); + PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class); + + final SnapshotLockingMode configuredSnapshotLockingMode = postgresConnectorConfig.snapshotLockingMode(); + final String snapshotLockingModeCustomName = postgresConnectorConfig.snapshotLockingModeCustomName(); + + String snapshotLockingMode; + if (CUSTOM.equals(configuredSnapshotLockingMode) && !snapshotLockingModeCustomName.isEmpty()) { + snapshotLockingMode = snapshotLockingModeCustomName; + } + else { + snapshotLockingMode = configuredSnapshotLockingMode.getValue(); + } + + Optional snapshotLock = StreamSupport.stream(ServiceLoader.load(SnapshotLock.class).spliterator(), false) + .filter(s -> s.name().equals(snapshotLockingMode)) + .findAny(); + + return snapshotLock.map(s -> { + s.configure(configuration.asMap()); + if (s instanceof BeanRegistryAware) { + ((BeanRegistryAware) s).injectBeanRegistry(beanRegistry); + } + return s; + }) + .orElseThrow( + () -> new DebeziumException(String.format("Unable to find %s snapshot locking mode. Please check your configuration.", snapshotLockingMode))); + + } + + @Override + public Class getServiceClass() { + return SnapshotLock.class; + } + +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotQueryProvider.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotQueryProvider.java new file mode 100644 index 000000000..b77932ac5 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotQueryProvider.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot; + +import static io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode.CUSTOM; + +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.stream.StreamSupport; + +import io.debezium.DebeziumException; +import io.debezium.bean.StandardBeanNames; +import io.debezium.bean.spi.BeanRegistry; +import io.debezium.bean.spi.BeanRegistryAware; +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode; +import io.debezium.service.spi.ServiceProvider; +import io.debezium.service.spi.ServiceRegistry; +import io.debezium.snapshot.spi.SnapshotQuery; + +/** + * An implementation of the {@link ServiceProvider} contract for the {@link SnapshotQuery}. + * + * @author Mario Fiore Vitale + */ +public class SnapshotQueryProvider implements ServiceProvider { + + @Override + public SnapshotQuery createService(Configuration configuration, ServiceRegistry serviceRegistry) { + + BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class); + PostgresConnectorConfig postgresConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, PostgresConnectorConfig.class); + + final SnapshotQueryMode configuredSnapshotQueryMode = postgresConnectorConfig.snapshotQueryMode(); + final String snapshotQueryModeCustomName = postgresConnectorConfig.snapshotQueryModeCustomName(); + + String snapshotQueryMode; + if (CUSTOM.equals(configuredSnapshotQueryMode) && !snapshotQueryModeCustomName.isEmpty()) { + snapshotQueryMode = snapshotQueryModeCustomName; + } + else { + snapshotQueryMode = configuredSnapshotQueryMode.getValue(); + } + + Optional snapshotQuery = StreamSupport.stream(ServiceLoader.load(SnapshotQuery.class).spliterator(), false) + .filter(s -> s.name().equals(snapshotQueryMode)) + .findAny(); + + return snapshotQuery.map(s -> { + s.configure(configuration.asMap()); + if (s instanceof BeanRegistryAware) { + ((BeanRegistryAware) s).injectBeanRegistry(beanRegistry); + } + return s; + }) + .orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshot query mode. Please check your configuration.", snapshotQueryMode))); + + } + + @Override + public Class getServiceClass() { + return SnapshotQuery.class; + } + +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotterWrapper.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotterWrapper.java deleted file mode 100644 index ddcbca49f..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/SnapshotterWrapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.snapshot; - -import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; - -/** - * This class is a small wrapper around the snapshotter that takes care of initialization - * and also lets us access the slotState (which we don't track currently) - */ -public class SnapshotterWrapper { - - private final Snapshotter snapshotter; - private final SlotState slotState; - - public SnapshotterWrapper(Snapshotter snapshotter, PostgresConnectorConfig config, OffsetState offsetState, SlotState slotState) { - this.snapshotter = snapshotter; - this.slotState = slotState; - this.snapshotter.init(config, offsetState, slotState); - } - - public Snapshotter getSnapshotter() { - return this.snapshotter; - } - - public boolean doesSlotExist() { - return this.slotState != null; - } -} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/NoSnapshotLock.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/NoSnapshotLock.java new file mode 100644 index 000000000..ed5756da2 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/NoSnapshotLock.java @@ -0,0 +1,33 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot.lock; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.snapshot.spi.SnapshotLock; + +public class NoSnapshotLock implements SnapshotLock { + + @Override + public String name() { + return PostgresConnectorConfig.SnapshotLockingMode.NONE.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { + + return Optional.empty(); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/SharedSnapshotLock.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/SharedSnapshotLock.java new file mode 100644 index 000000000..75d0da417 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/lock/SharedSnapshotLock.java @@ -0,0 +1,44 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot.lock; + +import java.time.Duration; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.snapshot.spi.SnapshotLock; + +public class SharedSnapshotLock implements SnapshotLock { + + @Override + public String name() { + return PostgresConnectorConfig.SnapshotLockingMode.SHARED.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public Optional tableLockingStatement(Duration lockTimeout, Set tableIds) { + + String lineSeparator = System.lineSeparator(); + StringBuilder statements = new StringBuilder(); + statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); + // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot + // this does not prevent writes to the table, but prevents changes to the table's schema.... + // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted + tableIds.forEach(tableId -> statements.append("LOCK TABLE ") + .append(tableId) + .append(" IN ACCESS SHARE MODE;") + .append(lineSeparator)); + + return Optional.of(statements.toString()); + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/AlwaysSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/AlwaysSnapshotter.java new file mode 100644 index 000000000..85d9d267f --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/AlwaysSnapshotter.java @@ -0,0 +1,60 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot.mode; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.spi.snapshot.Snapshotter; + +public class AlwaysSnapshotter implements Snapshotter { + + private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class); + + @Override + public String name() { + return PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean shouldSnapshot() { + LOGGER.info("Taking a new snapshot as per configuration"); + return true; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public boolean shouldSnapshotSchema() { + return false; + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialOnlySnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialOnlySnapshotter.java similarity index 62% rename from debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialOnlySnapshotter.java rename to debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialOnlySnapshotter.java index b3d6e6e6e..c11e6dc5e 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/InitialOnlySnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialOnlySnapshotter.java @@ -3,24 +3,20 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.connector.postgresql.snapshot; +package io.debezium.connector.postgresql.snapshot.mode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotState; -public class InitialOnlySnapshotter extends QueryingSnapshotter { +public class InitialOnlySnapshotter extends InitialSnapshotter { private final static Logger LOGGER = LoggerFactory.getLogger(InitialOnlySnapshotter.class); - private OffsetState sourceInfo; @Override - public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - super.init(config, sourceInfo, slotState); - this.sourceInfo = sourceInfo; + public String name() { + return PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue(); } @Override @@ -30,11 +26,12 @@ public boolean shouldStream() { @Override public boolean shouldSnapshot() { - if (sourceInfo == null) { + + if (!offsetContextExists) { LOGGER.info("Taking initial snapshot for new datasource"); return true; } - else if (sourceInfo.snapshotInEffect()) { + else if (isSnapshotInProgress) { LOGGER.info("Found previous incomplete snapshot"); return true; } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialSnapshotter.java new file mode 100644 index 000000000..47bfe86da --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/InitialSnapshotter.java @@ -0,0 +1,76 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot.mode; + +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.spi.snapshot.Snapshotter; + +public class InitialSnapshotter implements Snapshotter { + + private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class); + + protected boolean offsetContextExists; + protected boolean isSnapshotInProgress; + + @Override + public String name() { + return PostgresConnectorConfig.SnapshotMode.INITIAL.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + + this.offsetContextExists = offsetContextExists; + this.isSnapshotInProgress = isSnapshotInProgress; + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean shouldSnapshot() { + if (!offsetContextExists) { + LOGGER.info("Taking initial snapshot for new datasource"); + return true; + } + else if (isSnapshotInProgress) { + LOGGER.info("Found previous incomplete snapshot"); + return true; + } + else { + LOGGER.info( + "Previous snapshot has completed successfully, streaming logical changes from last known position"); + return false; + } + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public boolean shouldSnapshotSchema() { + return false; + } +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/NeverSnapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/NeverSnapshotter.java similarity index 61% rename from debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/NeverSnapshotter.java rename to debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/NeverSnapshotter.java index 2df93f0ed..cab8cb2b4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/NeverSnapshotter.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/mode/NeverSnapshotter.java @@ -3,28 +3,35 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.connector.postgresql.snapshot; +package io.debezium.connector.postgresql.snapshot.mode; -import java.util.List; -import java.util.Optional; +import java.util.Map; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; -import io.debezium.relational.TableId; +import io.debezium.spi.snapshot.Snapshotter; public class NeverSnapshotter implements Snapshotter { private final static Logger LOGGER = LoggerFactory.getLogger(NeverSnapshotter.class); @Override - public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - if (sourceInfo != null && sourceInfo.snapshotInEffect()) { + public String name() { + return PostgresConnectorConfig.SnapshotMode.NEVER.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + + if (offsetContextExists && isSnapshotInProgress) { String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured " + "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed."; LOGGER.error(msg); @@ -46,7 +53,17 @@ public boolean shouldSnapshot() { } @Override - public Optional buildSnapshotQuery(TableId tableId, List snapshotSelectColumns) { - throw new UnsupportedOperationException("'never' snapshot mode cannot build queries"); + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public boolean shouldSnapshotSchema() { + return false; } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/query/SelectAllSnapshotQuery.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/query/SelectAllSnapshotQuery.java new file mode 100644 index 000000000..2bf7df868 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/snapshot/query/SelectAllSnapshotQuery.java @@ -0,0 +1,34 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.postgresql.snapshot.query; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import io.debezium.connector.postgresql.PostgresConnectorConfig; +import io.debezium.snapshot.spi.SnapshotQuery; + +public class SelectAllSnapshotQuery implements SnapshotQuery { + + @Override + public String name() { + return PostgresConnectorConfig.SnapshotQueryMode.SELECT_ALL.getValue(); + } + + @Override + public void configure(Map properties) { + + } + + @Override + public Optional snapshotQuery(String tableId, List snapshotSelectColumns) { + + return Optional.of(snapshotSelectColumns.stream() + .collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId))); + } +} diff --git a/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotLock b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotLock new file mode 100644 index 000000000..efe3a3b9a --- /dev/null +++ b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotLock @@ -0,0 +1,2 @@ +io.debezium.connector.postgresql.snapshot.lock.NoSnapshotLock +io.debezium.connector.postgresql.snapshot.lock.SharedSnapshotLock \ No newline at end of file diff --git a/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery new file mode 100644 index 000000000..c28d552d4 --- /dev/null +++ b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery @@ -0,0 +1 @@ +io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery \ No newline at end of file diff --git a/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter new file mode 100644 index 000000000..0913eb033 --- /dev/null +++ b/debezium-connector-postgres/src/main/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter @@ -0,0 +1,4 @@ +io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter +io.debezium.connector.postgresql.snapshot.mode.InitialOnlySnapshotter +io.debezium.connector.postgresql.snapshot.mode.InitialSnapshotter +io.debezium.connector.postgresql.snapshot.mode.NeverSnapshotter \ No newline at end of file diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomLifecycleHookTestSnapshot.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomLifecycleHookTestSnapshot.java index 3e2565033..579a2255c 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomLifecycleHookTestSnapshot.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomLifecycleHookTestSnapshot.java @@ -6,10 +6,15 @@ package io.debezium.connector.postgresql; -import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; +import io.debezium.connector.postgresql.snapshot.mode.AlwaysSnapshotter; public class CustomLifecycleHookTestSnapshot extends AlwaysSnapshotter { + @Override + public String name() { + return CustomLifecycleHookTestSnapshot.class.getName(); + } + private static final String INSERT_SNAPSHOT_COMPLETE_STATE = "INSERT INTO s1.lifecycle_state (hook, state) " + "VALUES ('snapshotComplete', 'complete');"; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomPartialTableTestSnapshot.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomPartialTableTestSnapshot.java index 02723389b..be2bc9782 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomPartialTableTestSnapshot.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomPartialTableTestSnapshot.java @@ -9,15 +9,57 @@ import java.util.List; import java.util.Optional; -import io.debezium.relational.TableId; +import io.debezium.spi.snapshot.Snapshotter; + +public class CustomPartialTableTestSnapshot extends CustomStartFromStreamingTestSnapshot implements Snapshotter { -public class CustomPartialTableTestSnapshot extends CustomStartFromStreamingTestSnapshot { @Override - public Optional buildSnapshotQuery(TableId tableId, List snapshotSelectColumns) { - if (tableId.schema().equals("s1") && tableId.table().equals("a")) { - return super.buildSnapshotQuery(tableId, snapshotSelectColumns); + public String name() { + return CustomPartialTableTestSnapshot.class.getName(); + } + + @Override + public Optional snapshotQuery(String tableId, List snapshotSelectColumns) { + + if (tableId.contains("s1") && tableId.contains("a")) { + return super.snapshotQuery(tableId, snapshotSelectColumns); } return Optional.empty(); } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean shouldSnapshot() { + return true; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public boolean shouldSnapshotSchema() { + return false; + } + + @Override + public boolean shouldStreamEventsStartingFromSnapshot() { + return false; + } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomStartFromStreamingTestSnapshot.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomStartFromStreamingTestSnapshot.java index 9e13d2451..652d4ff2d 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomStartFromStreamingTestSnapshot.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomStartFromStreamingTestSnapshot.java @@ -6,9 +6,46 @@ package io.debezium.connector.postgresql; -import io.debezium.connector.postgresql.snapshot.AlwaysSnapshotter; +import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery; +import io.debezium.spi.snapshot.Snapshotter; + +public class CustomStartFromStreamingTestSnapshot extends SelectAllSnapshotQuery implements Snapshotter { + + @Override + public String name() { + return CustomStartFromStreamingTestSnapshot.class.getName(); + } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + + } + + @Override + public boolean shouldStream() { + return true; + } + + @Override + public boolean shouldSnapshot() { + return true; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public boolean shouldSnapshotSchema() { + return false; + } -public class CustomStartFromStreamingTestSnapshot extends AlwaysSnapshotter { @Override public boolean shouldStreamEventsStartingFromSnapshot() { return false; diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java index f5ea83d4a..bc29441b5 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CustomTestSnapshot.java @@ -9,11 +9,12 @@ import java.util.Optional; import java.util.stream.Collectors; -import io.debezium.connector.postgresql.spi.OffsetState; -import io.debezium.connector.postgresql.spi.SlotCreationResult; -import io.debezium.connector.postgresql.spi.SlotState; -import io.debezium.connector.postgresql.spi.Snapshotter; -import io.debezium.relational.TableId; +import io.debezium.bean.StandardBeanNames; +import io.debezium.bean.spi.BeanRegistry; +import io.debezium.bean.spi.BeanRegistryAware; +import io.debezium.connector.postgresql.snapshot.query.SelectAllSnapshotQuery; +import io.debezium.pipeline.spi.Offsets; +import io.debezium.spi.snapshot.Snapshotter; /** * This is a small class used in PostgresConnectorIT to test a custom snapshot @@ -21,13 +22,25 @@ * It is tightly coupled to the test there, but needs to be placed here in order * to allow for class loading to work */ -public class CustomTestSnapshot implements Snapshotter { +public class CustomTestSnapshot extends SelectAllSnapshotQuery implements Snapshotter, BeanRegistryAware { private boolean hasState; @Override - public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) { - hasState = (sourceInfo != null); + public String name() { + return CustomTestSnapshot.class.getName(); + } + + @Override + public void injectBeanRegistry(BeanRegistry beanRegistry) { + + Offsets postgresoffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class); + hasState = postgresoffsets.getTheOnlyOffset() != null; + } + + @Override + public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) { + hasState = offsetContextExists; } @Override @@ -41,26 +54,31 @@ public boolean shouldStream() { } @Override - public Optional buildSnapshotQuery(TableId tableId, List snapshotSelectColumns) { - if (!hasState && tableId.schema().equals("s2")) { + public boolean shouldSnapshotSchema() { + return false; + } + + @Override + public boolean shouldSnapshotOnSchemaError() { + return false; + } + + @Override + public boolean shouldSnapshotOnDataError() { + return false; + } + + @Override + public Optional snapshotQuery(String tableId, List snapshotSelectColumns) { + + if (!hasState && tableId.contains("s2")) { return Optional.empty(); } else { String query = snapshotSelectColumns.stream() - .collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId.toDoubleQuotedString())); + .collect(Collectors.joining(", ", "SELECT ", " FROM " + tableId)); return Optional.of(query); } } - - @Override - public String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { - if (newSlotInfo != null) { - String snapSet = String.format("SET TRANSACTION SNAPSHOT '%s';", newSlotInfo.snapshotName()); - return "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; \n" + snapSet; - } - else { - return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; - } - } } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index e354573e2..9ed897444 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -71,6 +71,7 @@ import io.debezium.config.Field; import io.debezium.connector.postgresql.PostgresConnectorConfig.LogicalDecoder; import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotQueryMode; import io.debezium.connector.postgresql.connection.AbstractMessageDecoder; import io.debezium.connector.postgresql.connection.PostgresConnection; import io.debezium.connector.postgresql.connection.PostgresReplicationConnection; @@ -80,7 +81,7 @@ import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule; import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs; import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot; -import io.debezium.connector.postgresql.snapshot.InitialOnlySnapshotter; +import io.debezium.connector.postgresql.snapshot.mode.InitialOnlySnapshotter; import io.debezium.connector.postgresql.spi.SlotState; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.data.Envelope; @@ -1653,7 +1654,9 @@ public void shouldAllowForCustomSnapshot() throws InterruptedException { TestHelper.execute(SETUP_TABLES_STMT); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build(); start(PostgresConnector.class, config); @@ -1684,7 +1687,9 @@ record = s2recs.get(0); config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) .build(); start(PostgresConnector.class, config); @@ -1972,7 +1977,7 @@ public void shouldResumeStreamingFromSlotPositionForCustomSnapshot() throws Exce // Perform an regular snapshot Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomStartFromStreamingTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build(); start(PostgresConnector.class, config); @@ -1995,7 +2000,7 @@ public void shouldResumeStreamingFromSlotPositionForCustomSnapshot() throws Exce // Perform catch up streaming and resnapshot everything config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomStartFromStreamingTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build(); start(PostgresConnector.class, config); @@ -2055,7 +2060,9 @@ public void customSnapshotterSkipsTablesOnRestart() throws Exception { // Perform a custom partial snapshot config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build(); start(PostgresConnector.class, config); @@ -2120,7 +2127,9 @@ public void customSnapshotterSkipsTablesOnRestartWithConcurrentTx() throws Excep // Perform a custom partial snapshot config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, SnapshotQueryMode.CUSTOM) + .with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomPartialTableTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build(); start(PostgresConnector.class, config); @@ -2171,7 +2180,7 @@ public void testCustomSnapshotterSnapshotCompleteLifecycleHook() throws Exceptio "CREATE TABLE s1.lifecycle_state (hook text, state text, PRIMARY KEY(hook));"); Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomLifecycleHookTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomLifecycleHookTestSnapshot.class.getName()) .build(); start(PostgresConnector.class, config); assertConnectorIsRunning(); @@ -3379,7 +3388,7 @@ public void shouldInvokeSnapshotterAbortedMethod() throws Exception { .with(PostgresConnectorConfig.POLL_INTERVAL_MS, 60 * 1000) .with(PostgresConnectorConfig.SNAPSHOT_FETCH_SIZE, 1) .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.CUSTOM.getValue()) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomLifecycleHookTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomLifecycleHookTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE); DebeziumEngine.CompletionCallback completionCallback = (success, message, error) -> { @@ -3436,7 +3445,7 @@ public void shouldThrowRightExceptionWhenNoCustomSnapshotClassProvided() { .until(() -> finished.get()); assertThat(status.get()).isFalse(); assertNull(error.get()); - assertThat(message.get()).contains("snapshot.custom_class cannot be empty when snapshot.mode 'custom' is defined"); + assertThat(message.get()).contains("snapshot.mode.custom.name cannot be empty when snapshot.mode 'custom' is defined"); } @FixFor("DBZ-5917") diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java index 98b54d2bb..b39bc03ff 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/RecordsSnapshotProducerIT.java @@ -1264,7 +1264,7 @@ private void buildNoStreamProducer(Configuration.Builder config) { alterConfig(config); start(PostgresConnector.class, config .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL_ONLY) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build()); assertConnectorIsRunning(); @@ -1274,7 +1274,7 @@ private void buildWithStreamProducer(Configuration.Builder config) { alterConfig(config); start(PostgresConnector.class, config .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.ALWAYS) - .with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()) + .with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) .build()); assertConnectorIsRunning(); diff --git a/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery b/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery new file mode 100644 index 000000000..6f2762a4e --- /dev/null +++ b/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.snapshot.spi.SnapshotQuery @@ -0,0 +1,3 @@ +io.debezium.connector.postgresql.CustomTestSnapshot +io.debezium.connector.postgresql.CustomPartialTableTestSnapshot +io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot \ No newline at end of file diff --git a/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter b/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter new file mode 100644 index 000000000..10f0c1220 --- /dev/null +++ b/debezium-connector-postgres/src/test/resources/META-INF/services/io.debezium.spi.snapshot.Snapshotter @@ -0,0 +1,4 @@ +io.debezium.connector.postgresql.CustomTestSnapshot +io.debezium.connector.postgresql.CustomLifecycleHookTestSnapshot +io.debezium.connector.postgresql.CustomPartialTableTestSnapshot +io.debezium.connector.postgresql.CustomStartFromStreamingTestSnapshot \ No newline at end of file diff --git a/debezium-core/src/main/java/io/debezium/bean/StandardBeanNames.java b/debezium-core/src/main/java/io/debezium/bean/StandardBeanNames.java index 937f50ad1..9da455bce 100644 --- a/debezium-core/src/main/java/io/debezium/bean/StandardBeanNames.java +++ b/debezium-core/src/main/java/io/debezium/bean/StandardBeanNames.java @@ -7,6 +7,7 @@ import io.debezium.config.Configuration; import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.spi.Offsets; import io.debezium.relational.ValueConverterProvider; /** @@ -42,4 +43,9 @@ public interface StandardBeanNames { // all connector-specific connection details across all connectors (including MongoDB), // and then replace this name key as CONNECTION_FACTORY regardless of the connector. String JDBC_CONNECTION = JdbcConnection.class.getName(); + + /** + * The connector-specific offsets. + */ + String OFFSETS = Offsets.class.getName(); } diff --git a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java index 25d713c4f..c638e3b2c 100644 --- a/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java +++ b/debezium-core/src/main/java/io/debezium/config/CommonConnectorConfig.java @@ -854,6 +854,23 @@ public static EventConvertingFailureHandlingMode parse(String value) { + "'warn' (the default) the value of column of event that conversion failed will be null and be logged with warn level; " + "'skip' the value of column of event that conversion failed will be null and be logged with debug level."); + public static final Field SNAPSHOT_MODE_CUSTOM_NAME = Field.create("snapshot.mode.custom.name") + .withDisplayName("Snapshot Mode Custom Name") + .withType(Type.STRING) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_SNAPSHOT, 11)) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withValidation((config, field, output) -> { + if ("custom".equalsIgnoreCase(config.getString("snapshot.mode")) && config.getString(field, "").isEmpty()) { + output.accept(field, "", "snapshot.mode.custom.name cannot be empty when snapshot.mode 'custom' is defined"); + return 1; + } + return 0; + }) + .withDescription( + "When 'snapshot.mode' is set as custom, this setting must be set to specify a the name of the custom implementation provided in the 'name()' method. " + + "The implementations must implement the 'Snapshotter' interface and is called on each app boot to determine whether to do a snapshot."); + protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor() .connector( EVENT_PROCESSING_FAILURE_HANDLING_MODE, @@ -867,6 +884,7 @@ public static EventConvertingFailureHandlingMode parse(String value) { SNAPSHOT_MODE_TABLES, SNAPSHOT_FETCH_SIZE, SNAPSHOT_MAX_THREADS, + SNAPSHOT_MODE_CUSTOM_NAME, RETRIABLE_RESTART_WAIT, QUERY_FETCH_SIZE, MAX_RETRIES_ON_ERROR, @@ -901,6 +919,8 @@ public static EventConvertingFailureHandlingMode parse(String value) { private final int incrementalSnapshotChunkSize; private final boolean incrementalSnapshotAllowSchemaChanges; private final int snapshotMaxThreads; + + private final String snapshotModeCustomName; private final Integer queryFetchSize; private final SourceInfoStructMaker sourceInfoStructMaker; private final boolean shouldProvideTransactionMetadata; @@ -944,6 +964,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi this.retriableRestartWait = Duration.ofMillis(config.getLong(RETRIABLE_RESTART_WAIT)); this.snapshotFetchSize = config.getInteger(SNAPSHOT_FETCH_SIZE, defaultSnapshotFetchSize); this.snapshotMaxThreads = config.getInteger(SNAPSHOT_MAX_THREADS); + this.snapshotModeCustomName = config.getString(SNAPSHOT_MODE_CUSTOM_NAME); this.queryFetchSize = config.getInteger(QUERY_FETCH_SIZE); this.incrementalSnapshotChunkSize = config.getInteger(INCREMENTAL_SNAPSHOT_CHUNK_SIZE); this.incrementalSnapshotAllowSchemaChanges = config.getBoolean(INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES); @@ -1065,6 +1086,10 @@ public int getSnapshotMaxThreads() { return snapshotMaxThreads; } + public String getSnapshotModeCustomName() { + return snapshotModeCustomName; + } + public int getQueryFetchSize() { return queryFetchSize; } diff --git a/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterService.java b/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterService.java new file mode 100644 index 000000000..907839900 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterService.java @@ -0,0 +1,48 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.snapshot; + +import io.debezium.annotation.Immutable; +import io.debezium.annotation.ThreadSafe; +import io.debezium.service.Service; +import io.debezium.snapshot.spi.SnapshotLock; +import io.debezium.snapshot.spi.SnapshotQuery; +import io.debezium.spi.snapshot.Snapshotter; + +/** + * Registry of all available snapshotters. + * + * @author Mario Fiore Vitale + */ +@ThreadSafe +public class SnapshotterService implements Service { + + @Immutable + private final Snapshotter snapshotter; + @Immutable + private final SnapshotQuery snapshotQuery; + @Immutable + private final SnapshotLock snapshotLock; + + public SnapshotterService(Snapshotter snapshotter, SnapshotQuery snapshotQuery, SnapshotLock snapshotLock) { + + this.snapshotter = snapshotter; + this.snapshotQuery = snapshotQuery; + this.snapshotLock = snapshotLock; + } + + public SnapshotQuery getSnapshotQuery() { + return snapshotQuery; + } + + public SnapshotLock getSnapshotLock() { + return snapshotLock; + } + + public Snapshotter getSnapshotter() { + return this.snapshotter; + } +} diff --git a/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterServiceProvider.java b/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterServiceProvider.java new file mode 100644 index 000000000..fbc0b28b3 --- /dev/null +++ b/debezium-core/src/main/java/io/debezium/snapshot/SnapshotterServiceProvider.java @@ -0,0 +1,75 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.snapshot; + +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.stream.StreamSupport; + +import io.debezium.DebeziumException; +import io.debezium.bean.StandardBeanNames; +import io.debezium.bean.spi.BeanRegistry; +import io.debezium.bean.spi.BeanRegistryAware; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.service.spi.ServiceProvider; +import io.debezium.service.spi.ServiceRegistry; +import io.debezium.snapshot.spi.SnapshotLock; +import io.debezium.snapshot.spi.SnapshotQuery; +import io.debezium.spi.snapshot.Snapshotter; + +/** + * An implementation of the {@link ServiceProvider} contract for the {@link SnapshotterService}. + * + * @author Mario Fiore Vitale + */ +public abstract class SnapshotterServiceProvider implements ServiceProvider { + + @Override + public SnapshotterService createService(Configuration configuration, ServiceRegistry serviceRegistry) { + + final BeanRegistry beanRegistry = serviceRegistry.tryGetService(BeanRegistry.class); + final CommonConnectorConfig commonConnectorConfig = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class); + + final String configuredSnapshotMode = snapshotMode(beanRegistry); + final String snapshotModeCustomName = commonConnectorConfig.getSnapshotModeCustomName(); + + String snapshotMode; + if ("custom".equals(configuredSnapshotMode) && !snapshotModeCustomName.isEmpty()) { + snapshotMode = snapshotModeCustomName; + } + else { + snapshotMode = configuredSnapshotMode; + } + + Optional snapshotter = StreamSupport.stream(ServiceLoader.load(Snapshotter.class).spliterator(), false) + .filter(s -> s.name().equals(snapshotMode)) + .findAny(); + + final SnapshotQuery snapshotQueryService = serviceRegistry.tryGetService(SnapshotQuery.class); + final SnapshotLock snapshotLockService = serviceRegistry.tryGetService(SnapshotLock.class); + + return snapshotter.map(s -> { + s.configure(configuration.asMap()); + if (s instanceof BeanRegistryAware) { + ((BeanRegistryAware) s).injectBeanRegistry(beanRegistry); + } + return new SnapshotterService(s, snapshotQueryService, snapshotLockService); + }) + .orElseThrow(() -> new DebeziumException(String.format("Unable to find %s snapshotter. Please check your configuration.", snapshotMode))); + + } + + @Override + public Class getServiceClass() { + return SnapshotterService.class; + } + + // TODO this could be delete after DBZ-7308 if all modes will be effectively available to all connectors and + // SnapshotMode enum moved into CommonConnectorConfig + public abstract String snapshotMode(BeanRegistry beanRegistry); + +} diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java b/debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotLock.java similarity index 55% rename from debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java rename to debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotLock.java index 4f5371046..881bbc09e 100644 --- a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java +++ b/debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotLock.java @@ -3,17 +3,28 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.spi.snapshot; +package io.debezium.snapshot.spi; import java.time.Duration; import java.util.Optional; import java.util.Set; +import io.debezium.service.Service; +import io.debezium.spi.common.Configurable; + /** - * This interface is used to determine the table lock mode used during schema snapshot + * {@link SnapshotLock} is used to determine the table lock mode used during schema snapshot * + * @author Mario Fiore Vitale */ -public interface SnapshotLock { +public interface SnapshotLock extends Configurable, Service { + + /** + * @return the name of the snapshot lock. + * + * + */ + String name(); /** * Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java b/debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotQuery.java similarity index 67% rename from debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java rename to debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotQuery.java index 593681a86..a268c5229 100644 --- a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java +++ b/debezium-core/src/main/java/io/debezium/snapshot/spi/SnapshotQuery.java @@ -3,16 +3,27 @@ * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 */ -package io.debezium.spi.snapshot; +package io.debezium.snapshot.spi; import java.util.List; import java.util.Optional; +import io.debezium.service.Service; +import io.debezium.spi.common.Configurable; + /** - * This interface is used to determine the query used during data snapshot + * {@link SnapshotQuery} is used to determine the query used during data snapshot * + * @author Mario Fiore Vitale */ -public interface SnapshotQuery { +public interface SnapshotQuery extends Configurable, Service { + + /** + * @return the name of the snapshot lock. + * + * + */ + String name(); /** * Generate a valid query string for the specified table, or an empty {@link Optional}