From 8e177243921c0925716235dd6f96adc5f62b2889 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 15 Jan 2024 10:13:00 +0100 Subject: [PATCH] DBZ-7300 Refactor Snapshotter interface to be an SPI through SnapshotterProvider. Also, this will split snapshot locking and query behavior into a separate interface --- .../debezium/spi/snapshot/SnapshotLock.java | 24 ++++ .../debezium/spi/snapshot/SnapshotQuery.java | 28 +++++ .../io/debezium/spi/snapshot/Snapshotter.java | 76 ++++++++++++ .../spi/snapshot/SnapshotterProvider.java | 29 +++++ .../connector/postgresql/spi/Snapshotter.java | 111 ------------------ 5 files changed, 157 insertions(+), 111 deletions(-) create mode 100644 debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java create mode 100644 debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java create mode 100644 debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java create mode 100644 debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java delete mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java new file mode 100644 index 000000000..4f5371046 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotLock.java @@ -0,0 +1,24 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.spi.snapshot; + +import java.time.Duration; +import java.util.Optional; +import java.util.Set; + +/** + * This interface is used to determine the table lock mode used during schema snapshot + * + */ +public interface SnapshotLock { + + /** + * Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter + * implementation. + */ + Optional tableLockingStatement(Duration lockTimeout, Set tableIds); + +} diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java new file mode 100644 index 000000000..593681a86 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotQuery.java @@ -0,0 +1,28 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.spi.snapshot; + +import java.util.List; +import java.util.Optional; + +/** + * This interface is used to determine the query used during data snapshot + * + */ +public interface SnapshotQuery { + + /** + * Generate a valid query string for the specified table, or an empty {@link Optional} + * to skip snapshotting this table (but that table will still be streamed from) + * + * @param tableId the table to generate a query for + * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column + * include/exclude filters + * @return a valid query string, or none to skip snapshotting this table + */ + Optional snapshotQuery(String tableId, List snapshotSelectColumns); + +} diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java new file mode 100644 index 000000000..0328df5a4 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/spi/snapshot/Snapshotter.java @@ -0,0 +1,76 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.spi.snapshot; + +import io.debezium.common.annotation.Incubating; + +/** + * This interface is used to determine details about the snapshot process: + * + * Namely: + * - Should a snapshot occur at all + * - Should streaming occur + * - What queries should be used to snapshot + * + * While many default snapshot modes are provided with debezium (see documentation for details) + * a custom implementation of this interface can be provided by the implementor which + * can provide more advanced functionality, such as partial snapshots + * + * Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()} + * or true for both. + */ +@Incubating +public interface Snapshotter { + + /** + * @return true if the snapshotter should take a snapshot + */ + boolean shouldSnapshot(); + + /** + * @return true if the snapshotter should take a snapshot + */ + boolean shouldSnapshotSchema(); + + /** + * @return true if the snapshotter should stream after taking a snapshot + */ + boolean shouldStream(); + + /** + * @return true whether the schema can be recovered if database schema history is corrupted. + */ + boolean shouldSnapshotOnSchemaError(); + + /** + * @return true whether the snapshot should be re-executed when there is a gap in data stream. + */ + boolean shouldSnapshotOnDataError(); + + /** + * + * @return true if streaming should resume from the start of the snapshot + * transaction, or false for when a connector resumes and takes a snapshot, + * streaming should resume from where streaming previously left off. + */ + default boolean shouldStreamEventsStartingFromSnapshot() { + return true; + } + + /** + * Lifecycle hook called once the snapshot phase is successful. + */ + default void snapshotCompleted() { + // no operation + } + + /** + * Lifecycle hook called once the snapshot phase is aborted. + */ + default void snapshotAborted() { + // no operation + } +} diff --git a/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java new file mode 100644 index 000000000..064ebb369 --- /dev/null +++ b/debezium-api/src/main/java/io/debezium/spi/snapshot/SnapshotterProvider.java @@ -0,0 +1,29 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.spi.snapshot; + +import java.util.List; +import java.util.Map; + +/** + * This interface is used to provide custom snapshotters: + * Implementations must: + * + * provide a map of snapshotter in the {@link #create(Configuration config)} method. + * + * @author Mario Fiore Vitale + */ +public interface SnapshotterProvider { + + /** + * Create a map of snapshotter where the key is its name used in 'snapshot.mode' configuration. + * + * @param Configuration the connector configuration + * + * @return a map of custom snapshotter + */ + Map create(Configuration config); // Can we move the Configuration interface from core module? +} diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java deleted file mode 100644 index 75aa638eb..000000000 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/spi/Snapshotter.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright Debezium Authors. - * - * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - */ -package io.debezium.connector.postgresql.spi; - -import java.time.Duration; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -import io.debezium.common.annotation.Incubating; -import io.debezium.connector.postgresql.PostgresConnectorConfig; -import io.debezium.relational.TableId; - -/** - * This interface is used to determine details about the snapshot process: - * - * Namely: - * - Should a snapshot occur at all - * - Should streaming occur - * - What queries should be used to snapshot - * - * While many default snapshot modes are provided with debezium (see documentation for details) - * a custom implementation of this interface can be provided by the implementor which - * can provide more advanced functionality, such as partial snapshots - * - * Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()} - * or true for both. - */ -@Incubating -public interface Snapshotter { - - void init(PostgresConnectorConfig config, OffsetState sourceInfo, - SlotState slotState); - - /** - * @return true if the snapshotter should take a snapshot - */ - boolean shouldSnapshot(); - - /** - * @return true if the snapshotter should stream after taking a snapshot - */ - boolean shouldStream(); - - /** - * - * @return true if streaming should resume from the start of the snapshot - * transaction, or false for when a connector resumes and takes a snapshot, - * streaming should resume from where streaming previously left off. - */ - default boolean shouldStreamEventsStartingFromSnapshot() { - return true; - } - - /** - * Generate a valid postgres query string for the specified table, or an empty {@link Optional} - * to skip snapshotting this table (but that table will still be streamed from) - * - * @param tableId the table to generate a query for - * @param snapshotSelectColumns the columns to be used in the snapshot select based on the column - * include/exclude filters - * @return a valid query string, or none to skip snapshotting this table - */ - Optional buildSnapshotQuery(TableId tableId, List snapshotSelectColumns); - - /** - * Return a new string that set up the transaction for snapshotting - * - * @param newSlotInfo if a new slot was created for snapshotting, this contains information from - * the `create_replication_slot` command - */ - default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) { - // we're using the same isolation level that pg_backup uses - return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;"; - } - - /** - * Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter - * implementation. - */ - default Optional snapshotTableLockingStatement(Duration lockTimeout, Set tableIds) { - String lineSeparator = System.lineSeparator(); - StringBuilder statements = new StringBuilder(); - statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator); - // we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot - // this does not prevent writes to the table, but prevents changes to the table's schema.... - // DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted - tableIds.forEach(tableId -> statements.append("LOCK TABLE ") - .append(tableId.toDoubleQuotedString()) - .append(" IN ACCESS SHARE MODE;") - .append(lineSeparator)); - return Optional.of(statements.toString()); - } - - /** - * Lifecycle hook called once the snapshot phase is successful. - */ - default void snapshotCompleted() { - // no operation - } - - /** - * Lifecycle hook called once the snapshot phase is aborted. - */ - default void snapshotAborted() { - // no operation - } -}