DBZ-7300 Refactor Snapshotter interface to be an SPI through SnapshotterProvider.

Also, this will split snapshot locking and query behavior into a separate interface
This commit is contained in:
mfvitale 2024-01-15 10:13:00 +01:00 committed by Jiri Pechanec
parent fa338b61c4
commit 8e17724392
5 changed files with 157 additions and 111 deletions

View File

@ -0,0 +1,24 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
/**
* This interface is used to determine the table lock mode used during schema snapshot
*
*/
public interface SnapshotLock {
/**
* Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter
* implementation.
*/
Optional<String> tableLockingStatement(Duration lockTimeout, Set<String> tableIds);
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
import java.util.List;
import java.util.Optional;
/**
* This interface is used to determine the query used during data snapshot
*
*/
public interface SnapshotQuery {
/**
* Generate a valid query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> snapshotQuery(String tableId, List<String> snapshotSelectColumns);
}

View File

@ -0,0 +1,76 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
import io.debezium.common.annotation.Incubating;
/**
* This interface is used to determine details about the snapshot process:
*
* Namely:
* - Should a snapshot occur at all
* - Should streaming occur
* - What queries should be used to snapshot
*
* While many default snapshot modes are provided with debezium (see documentation for details)
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
* Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()}
* or true for both.
*/
@Incubating
public interface Snapshotter {
/**
* @return true if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
/**
* @return true if the snapshotter should take a snapshot
*/
boolean shouldSnapshotSchema();
/**
* @return true if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
* @return true whether the schema can be recovered if database schema history is corrupted.
*/
boolean shouldSnapshotOnSchemaError();
/**
* @return true whether the snapshot should be re-executed when there is a gap in data stream.
*/
boolean shouldSnapshotOnDataError();
/**
*
* @return true if streaming should resume from the start of the snapshot
* transaction, or false for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Lifecycle hook called once the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called once the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.spi.snapshot;
import java.util.List;
import java.util.Map;
/**
* This interface is used to provide custom snapshotters:
* Implementations must:
*
* provide a map of snapshotter in the {@link #create(Configuration config)} method.
*
* @author Mario Fiore Vitale
*/
public interface SnapshotterProvider {
/**
* Create a map of snapshotter where the key is its name used in 'snapshot.mode' configuration.
*
* @param Configuration the connector configuration
*
* @return a map of custom snapshotter
*/
Map<String, Snapshotter> create(Configuration config); // Can we move the Configuration interface from core module?
}

View File

@ -1,111 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.postgresql.spi;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import io.debezium.common.annotation.Incubating;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.relational.TableId;
/**
* This interface is used to determine details about the snapshot process:
*
* Namely:
* - Should a snapshot occur at all
* - Should streaming occur
* - What queries should be used to snapshot
*
* While many default snapshot modes are provided with debezium (see documentation for details)
* a custom implementation of this interface can be provided by the implementor which
* can provide more advanced functionality, such as partial snapshots
*
* Implementor's must return true for either {@link #shouldSnapshot()} or {@link #shouldStream()}
* or true for both.
*/
@Incubating
public interface Snapshotter {
void init(PostgresConnectorConfig config, OffsetState sourceInfo,
SlotState slotState);
/**
* @return true if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
/**
* @return true if the snapshotter should stream after taking a snapshot
*/
boolean shouldStream();
/**
*
* @return true if streaming should resume from the start of the snapshot
* transaction, or false for when a connector resumes and takes a snapshot,
* streaming should resume from where streaming previously left off.
*/
default boolean shouldStreamEventsStartingFromSnapshot() {
return true;
}
/**
* Generate a valid postgres query string for the specified table, or an empty {@link Optional}
* to skip snapshotting this table (but that table will still be streamed from)
*
* @param tableId the table to generate a query for
* @param snapshotSelectColumns the columns to be used in the snapshot select based on the column
* include/exclude filters
* @return a valid query string, or none to skip snapshotting this table
*/
Optional<String> buildSnapshotQuery(TableId tableId, List<String> snapshotSelectColumns);
/**
* Return a new string that set up the transaction for snapshotting
*
* @param newSlotInfo if a new slot was created for snapshotting, this contains information from
* the `create_replication_slot` command
*/
default String snapshotTransactionIsolationLevelStatement(SlotCreationResult newSlotInfo, boolean isOnDemand) {
// we're using the same isolation level that pg_backup uses
return "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE;";
}
/**
* Returns a SQL statement for locking the given tables during snapshotting, if required by the specific snapshotter
* implementation.
*/
default Optional<String> snapshotTableLockingStatement(Duration lockTimeout, Set<TableId> tableIds) {
String lineSeparator = System.lineSeparator();
StringBuilder statements = new StringBuilder();
statements.append("SET lock_timeout = ").append(lockTimeout.toMillis()).append(";").append(lineSeparator);
// we're locking in ACCESS SHARE MODE to avoid concurrent schema changes while we're taking the snapshot
// this does not prevent writes to the table, but prevents changes to the table's schema....
// DBZ-298 Quoting name in case it has been quoted originally; it doesn't do harm if it hasn't been quoted
tableIds.forEach(tableId -> statements.append("LOCK TABLE ")
.append(tableId.toDoubleQuotedString())
.append(" IN ACCESS SHARE MODE;")
.append(lineSeparator));
return Optional.of(statements.toString());
}
/**
* Lifecycle hook called once the snapshot phase is successful.
*/
default void snapshotCompleted() {
// no operation
}
/**
* Lifecycle hook called once the snapshot phase is aborted.
*/
default void snapshotAborted() {
// no operation
}
}