DBZ-7461 Move snapshot mode validation logic to BaseSourceTask

This commit is contained in:
mfvitale 2024-02-14 17:24:57 +01:00 committed by Fiore Mario Vitale
parent 99eee98e9d
commit 371905f135
38 changed files with 248 additions and 596 deletions

View File

@ -5,7 +5,6 @@
*/
package io.debezium.spi.snapshot;
import io.debezium.DebeziumException;
import io.debezium.common.annotation.Incubating;
import io.debezium.spi.common.Configurable;
@ -36,23 +35,18 @@ public interface Snapshotter extends Configurable {
String name();
/**
* Validate the snapshotter compatibility with the current connector configuration.
* Throws a {@link DebeziumException} in case it is not compatible.
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*
* @param offsetContextExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param isSnapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*/
void validate(boolean offsetContextExists, boolean isSnapshotInProgress);
/**
* @return {@code true} if the snapshotter should take a snapshot
*/
boolean shouldSnapshot();
boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should take a snapshot
* @param offsetExists is {@code true} when the connector has an offset context (i.e. restarted)
* @param snapshotInProgress is {@code true} when the connector is started, but a snapshot is already in progress
*/
boolean shouldSnapshotSchema();
boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress);
/**
* @return {@code true} if the snapshotter should stream after taking a snapshot

View File

@ -21,7 +21,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.connector.mysql.MySqlConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.snapshot.MySqlSnapshotLockProvider;
import io.debezium.connector.mysql.snapshot.MySqlSnapshotterServiceProvider;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
@ -41,6 +40,7 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.signal.SignalProcessor;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaFactory;
@ -123,6 +123,11 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
validateBinlogConfiguration(snapshotter);
// If the binlog position is not available it is necessary to re-execute snapshot
if (validateSnapshotFeasibility(snapshotter, previousOffsets.getTheOnlyOffset())) {
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
}
LOGGER.info("Closing connection before starting schema recovery");
try {
@ -134,7 +139,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
MySqlOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadSchemaHistory(connectorConfig, previousOffsets, schema, snapshotter);
validateAndLoadSchemaHistory(connectorConfig, connection, previousOffsets, schema, snapshotter);
LOGGER.info("Reconnecting after finishing schema recovery");
@ -148,11 +153,9 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
// If the binlog position is not available it is necessary to re-execute snapshot
if (previousOffset == null) {
LOGGER.info("No previous offset found");
snapshotter.validate(false, false);
}
else {
LOGGER.info("Found previous offset {}", previousOffset);
snapshotter.validate(true, previousOffset.isSnapshotRunning());
}
taskContext = new MySqlTaskContext(connectorConfig, schema);
@ -250,6 +253,24 @@ protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
serviceRegistry.registerServiceProvider(new MySqlSnapshotterServiceProvider());
}
private boolean validateSnapshotFeasibility(Snapshotter snapshotter, OffsetContext offset) {
if (offset == null) {
if (!snapshotter.shouldSnapshot(false, false)) {
// Look to see what the first available binlog file is called, and whether it looks like binlog files have
// been purged. If so, then output a warning ...
String earliestBinlogFilename = connection.earliestBinlogFilename();
if (earliestBinlogFilename == null) {
LOGGER.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
}
else if (!earliestBinlogFilename.endsWith("00001")) {
LOGGER.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
}
}
}
return false;
}
private MySqlValueConverters getValueConverters(MySqlConnectorConfig configuration) {
// Use MySQL-specific converters and schemas for values ...
@ -338,43 +359,6 @@ private void validateBinlogConfiguration(Snapshotter snapshotter) {
}
}
private void validateAndLoadSchemaHistory(MySqlConnectorConfig config, Offsets<MySqlPartition, MySqlOffsetContext> previousoffsets, MySqlDatabaseSchema schema,
Snapshotter snapshotter) {
final MySqlOffsetContext offset = previousoffsets.getTheOnlyOffset();
final MySqlPartition partition = previousoffsets.getTheOnlyPartition();
if (offset == null) {
if (snapshotter.shouldSnapshotOnSchemaError()) {
// We are in schema only recovery mode, use the existing binlog position
// would like to also verify binlog position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing binlog information while attempting schema only recovery snapshot");
}
LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
schema.initializeStorage();
return;
}
if (!schema.historyExists()) {
LOGGER.warn("Database schema history was not found but was expected");
if (snapshotter.shouldSnapshotOnSchemaError()) {
// But check to see if the server still has those binlog coordinates ...
if (!connection.isBinlogPositionAvailable(config, offset.gtidSet(), offset.getSource().binlogFilename())) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset.getSource() + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the binlog from the last recorded offset.",
SnapshotMode.SCHEMA_ONLY_RECOVERY); // TODO 7308 this should be the name of current snapshot mode
}
else {
throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to "
+ SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
schema.initializeStorage();
return;
}
schema.recover(partition, offset);
}
private void resetOffset(MySqlConnectorConfig connectorConfig, MySqlOffsetContext previousOffset,
SignalProcessor<MySqlPartition, MySqlOffsetContext> signalProcessor) {
boolean isKafkaChannelEnabled = connectorConfig.getEnabledChannels().contains(KafkaSignalChannel.CHANNEL_NAME);

View File

@ -359,13 +359,6 @@ protected DdlParser getDdlParser() {
return ddlParser;
}
/**
* Return true if the database schema history entity exists
*/
public boolean historyExists() {
return schemaHistory.exists();
}
/**
* Assign the given table number to the table with the specified {@link TableId table ID}.
*

View File

@ -95,21 +95,37 @@ public MySqlSnapshotChangeEventSource(MySqlConnectorConfig connectorConfig, Main
@Override
public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffsetContext previousOffset) {
// TODO DBZ-7308 evaluate is this can be shared
// TODO DBZ-7308 evaluate getSnapshottingTask can be shared
final Snapshotter snapshotter = snapshotterService.getSnapshotter();
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
if (snapshotter.shouldSnapshotSchema() && snapshotter.shouldSnapshot()) {
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;
if (!offsetExists) {
LOGGER.info("No previous offset found");
}
if (offsetExists && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotInProgress = true;
}
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
boolean shouldSnapshotData = snapshotter.shouldSnapshot(offsetExists, snapshotInProgress);
if (shouldSnapshotSchema && shouldSnapshotData) {
LOGGER.info("According to the connector configuration both schema and data will be snapshotted");
}
else {
LOGGER.info("According to the connector configuration only schema will be snapshotted");
}
return new SnapshottingTask(snapshotter.shouldSnapshotSchema(), snapshotter.shouldSnapshot(),
return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData,
dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable, false);
}

View File

@ -10,16 +10,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class AlwaysSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public String name() {
return MySqlConnectorConfig.SnapshotMode.ALWAYS.getValue();
return "always";
}
@Override
@ -28,20 +27,15 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
return true;
}
@Override
public boolean shouldSnapshotSchema() {
return true;
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true; // TODO use schema to determine behavior based on historized or not
}
@Override
@ -55,7 +49,7 @@ public boolean shouldSnapshotOnSchemaError() {
}
@Override
public boolean shouldSnapshotOnDataError() { // TODO check with DBZ-7308
public boolean shouldSnapshotOnDataError() {
return false;
}
}

View File

@ -5,17 +5,9 @@
*/
package io.debezium.connector.mysql.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.HistorizedSnapshotter;
public class InitialSnapshotter extends HistorizedSnapshotter {
@ -30,30 +22,6 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offset != null && !offset.isSnapshotRunning()) {
// Check to see if the server still has those binlog coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
@Override
protected boolean shouldSnapshotWhenNoOffset() {
return true;

View File

@ -7,26 +7,12 @@
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.strategy.AbstractConnectorConnection;
import io.debezium.connector.mysql.strategy.mariadb.MariaDbConnection;
import io.debezium.connector.mysql.strategy.mariadb.MariaDbConnectorAdapter;
import io.debezium.connector.mysql.strategy.mysql.MySqlConnection;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.snapshot.mode.BeanAwareSnapshotter;
import io.debezium.spi.snapshot.Snapshotter;
public class NeverSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(NeverSnapshotter.class);
@Override
public String name() {
return MySqlConnectorConfig.SnapshotMode.NEVER.getValue();
@ -38,42 +24,7 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final MySqlConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, MySqlConnectorConfig.class);
final AbstractConnectorConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, getConnectionClass(config));
final Offsets<MySqlPartition, MySqlOffsetContext> mySqloffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final MySqlOffsetContext offset = mySqloffsets.getTheOnlyOffset();
if (offsetContextExists) {
if (isSnapshotInProgress) {
// The last offset was an incomplete snapshot and now the snapshot was disabled
throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
}
else {
// But check to see if the server still has those binlog coordinates ...
if (!connection.isBinlogPositionAvailable(config, offset.gtidSet(), offset.getSource().binlogFilename())) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset.getSource() + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
}
}
// Look to see what the first available binlog file is called, and whether it looks like binlog files have
// been purged. If so, then output a warning ...
String earliestBinlogFilename = connection.earliestBinlogFilename();
if (earliestBinlogFilename == null) {
LOGGER.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
}
else if (!earliestBinlogFilename.endsWith("00001")) {
LOGGER.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
}
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
@ -83,7 +34,7 @@ public boolean shouldStream() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
@ -97,13 +48,4 @@ public boolean shouldSnapshotOnDataError() {
return false;
}
private Class<? extends AbstractConnectorConnection> getConnectionClass(MySqlConnectorConfig config) {
// TODO review this when MariaDB becomes a first class connector
if (config.getConnectorAdapter() instanceof MariaDbConnectorAdapter) {
return MariaDbConnection.class;
}
else {
return MySqlConnection.class;
}
}
}

View File

@ -5,17 +5,9 @@
*/
package io.debezium.connector.mysql.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.HistorizedSnapshotter;
public class SchemaOnlySnapshotter extends HistorizedSnapshotter {
@ -30,30 +22,6 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offset != null && !offset.isSnapshotRunning()) {
// Check to see if the server still has those binlog coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
@Override
protected boolean shouldSnapshotWhenNoOffset() {
return false;

View File

@ -5,26 +5,13 @@
*/
package io.debezium.connector.mysql.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.HistorizedSnapshotter;
public class WhenNeededSnapshotter extends HistorizedSnapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(WhenNeededSnapshotter.class);
@Override
public String name() {
return MySqlConnectorConfig.SnapshotMode.WHEN_NEEDED.getValue();
@ -35,31 +22,6 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offsetContextExists && !isSnapshotInProgress) {
// Check to see if the server still has those binlog coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
LOGGER.warn(
"The connector is trying to read log starting at '{}', but this is no longer available on the server. Forcing the snapshot execution as it is allowed by the configuration.",
offset);
offsets.resetOffset(offsets.getTheOnlyPartition());
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
@Override
protected boolean shouldSnapshotWhenNoOffset() {
return true;

View File

@ -39,12 +39,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
hasState = offsetContextExists;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -54,7 +49,7 @@ public boolean shouldStream() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}

View File

@ -415,15 +415,19 @@ public Optional<Scn> getFirstScnInLogs(Duration archiveLogRetention, String arch
return Optional.of(Scn.valueOf(oldestScn));
}
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) throws SQLException {
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
final Duration archiveLogRetention = ((OracleConnectorConfig) config).getLogMiningArchiveLogRetention(); // TODO generify this a generic properties valid for the different Oracle flavors
final String archiveDestinationName = ((OracleConnectorConfig) config).getLogMiningArchiveDestinationName();
final Scn storedOffset = ((OracleConnectorConfig) config).getAdapter().getOffsetScn((OracleOffsetContext) offset);
Optional<Scn> firstAvailableScn = getFirstScnInLogs(archiveLogRetention, archiveDestinationName);
return firstAvailableScn.filter(isLessThan(storedOffset)).isPresent();
try {
Optional<Scn> firstAvailableScn = getFirstScnInLogs(archiveLogRetention, archiveDestinationName);
return firstAvailableScn.filter(isLessThan(storedOffset)).isPresent();
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
private static Predicate<Scn> isLessThan(Scn storedOffset) {

View File

@ -40,7 +40,6 @@
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
@ -95,25 +94,21 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
registerServiceProviders(connectorConfig.getServiceRegistry());
final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
final Snapshotter snapshotter = snapshotterService.getSnapshotter();
validateRedoLogConfiguration(connectorConfig, snapshotterService);
OraclePartition partition = previousOffsets.getTheOnlyPartition();
OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadSchemaHistory(connectorConfig, partition, previousOffset, schema, snapshotterService);
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotterService.getSnapshotter());
taskContext = new OracleTaskContext(connectorConfig, schema);
// If the binlog position is not available it is necessary to re-execute snapshot
if (previousOffset == null) {
LOGGER.info("No previous offset found");
snapshotter.validate(false, false);
}
else {
LOGGER.info("Found previous offset {}", previousOffset);
snapshotter.validate(true, previousOffset.isSnapshotRunning());
}
Clock clock = Clock.system();
@ -257,32 +252,4 @@ private static boolean redoLogRequired(OracleConnectorConfig config, Snapshotter
config.getLogMiningTransactionSnapshotBoundaryMode() == OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL;
}
private void validateAndLoadSchemaHistory(OracleConnectorConfig config, OraclePartition partition, OracleOffsetContext offset, OracleDatabaseSchema schema,
SnapshotterService snapshotterService) {
if (offset == null) { // TODO move this into snapshotter validate
if (snapshotterService.getSnapshotter().shouldSnapshotOnSchemaError() && config.getSnapshotMode() != OracleConnectorConfig.SnapshotMode.ALWAYS) {
// We are in schema only recovery mode, use the existing redo log position
// would like to also verify redo log position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
}
LOGGER.info("Connector started for the first time, database schema history recovery will not be executed");
schema.initializeStorage();
return;
}
if (!schema.historyExists()) {
LOGGER.warn("Database schema history was not found but was expected");
if (snapshotterService.getSnapshotter().shouldSnapshotOnSchemaError()) {
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.",
OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
else {
throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to "
+ OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
}
schema.initializeStorage();
return;
}
schema.recover(Offsets.of(partition, offset));
}
}

View File

@ -108,13 +108,6 @@ public void applySchemaChange(SchemaChangeEvent schemaChange) {
}
}
/**
* Return true if the database schema history entity exists
*/
public boolean historyExists() {
return schemaHistory.exists();
}
@Override
protected void removeSchema(TableId id) {
super.removeSchema(id);

View File

@ -73,14 +73,30 @@ public SnapshottingTask getSnapshottingTask(OraclePartition partition, OracleOff
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
if (snapshotter.shouldSnapshot() && snapshotter.shouldSnapshotSchema()) {
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;
if (!offsetExists) {
LOGGER.info("No previous offset found");
}
if (offsetExists && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotInProgress = true;
}
boolean shouldSnapshotSchema = snapshotter.shouldSnapshotSchema(offsetExists, snapshotInProgress);
boolean shouldSnapshotData = snapshotter.shouldSnapshot(offsetExists, snapshotInProgress);
if (shouldSnapshotData && shouldSnapshotSchema) {
LOGGER.info("According to the connector configuration both schema and data will be snapshot.");
}
else if (snapshotter.shouldSnapshotSchema()) {
else if (shouldSnapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}
return new SnapshottingTask(snapshotter.shouldSnapshotSchema(), snapshotter.shouldSnapshot(), dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable,
return new SnapshottingTask(shouldSnapshotSchema, shouldSnapshotData,
dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable,
false);
}

View File

@ -149,6 +149,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
LOGGER.info("Streaming is not enabled in current configuration");
return;
}
try {
prepareConnection(false);

View File

@ -10,17 +10,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.snapshot.mode.BeanAwareSnapshotter;
import io.debezium.spi.snapshot.Snapshotter;
public class AlwaysSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
public class AlwaysSnapshotter implements Snapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public String name() {
return OracleConnectorConfig.SnapshotMode.ALWAYS.getValue();
return "always";
}
@Override
@ -29,19 +27,14 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
return true;
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -56,7 +49,7 @@ public boolean shouldSnapshotOnSchemaError() {
}
@Override
public boolean shouldSnapshotOnDataError() { // TODO check with DBZ-7308
public boolean shouldSnapshotOnDataError() {
return false;
}
}

View File

@ -5,17 +5,9 @@
*/
package io.debezium.connector.oracle.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.HistorizedSnapshotter;
public class InitialSnapshotter extends HistorizedSnapshotter {
@ -30,30 +22,6 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offset != null && !offset.isSnapshotRunning()) {
// Check to see if the server still has those binlog coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
@Override
protected boolean shouldSnapshotWhenNoOffset() {
return true;
@ -75,7 +43,7 @@ public boolean shouldSnapshotOnSchemaError() {
}
@Override
public boolean shouldSnapshotOnDataError() { // TODO check with DBZ-7308
public boolean shouldSnapshotOnDataError() {
return false;
}
}

View File

@ -7,7 +7,7 @@
import io.debezium.connector.oracle.OracleConnectorConfig;
public class SchemaRecoveryOnlySnapshotter extends SchemaOnlySnapshotter {
public class SchemaOnlyRecoverySnapshotter extends SchemaOnlySnapshotter {
@Override
public String name() {

View File

@ -5,17 +5,9 @@
*/
package io.debezium.connector.oracle.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.HistorizedSnapshotter;
public class SchemaOnlySnapshotter extends HistorizedSnapshotter {
@ -30,30 +22,6 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offset != null && !offset.isSnapshotRunning()) {
// Check to see if the server still has those binlog coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
throw new DebeziumException("The connector is trying to read binlog starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.");
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
@Override
protected boolean shouldSnapshotWhenNoOffset() {
return false;

View File

@ -5,26 +5,14 @@
*/
package io.debezium.connector.oracle.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.snapshot.mode.BeanAwareSnapshotter;
import io.debezium.spi.snapshot.Snapshotter;
public class WhenNeededSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(WhenNeededSnapshotter.class);
@Override
public String name() {
return OracleConnectorConfig.SnapshotMode.WHEN_NEEDED.getValue();
@ -36,31 +24,7 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final OracleConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, OracleConnectorConfig.class);
final OracleConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, OracleConnection.class);
final Offsets<OraclePartition, OracleOffsetContext> oracleOffsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OracleOffsetContext offset = oracleOffsets.getTheOnlyOffset();
try {
if (offsetContextExists && !isSnapshotInProgress) {
// Check to see if the server still has those log coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
LOGGER.warn(
"The connector is trying to read log starting at '{}', but this is no longer available on the server. Forcing the snapshot execution as it is allowed by the configuration.",
offset.getScn());
oracleOffsets.resetOffset(oracleOffsets.getTheOnlyPartition()); // TODO DBZ-7308 evaluate if side effect can be extracted
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available scn", e);
}
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -70,7 +34,7 @@ public boolean shouldStream() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}

View File

@ -2,5 +2,5 @@ io.debezium.connector.oracle.snapshot.mode.AlwaysSnapshotter
io.debezium.connector.oracle.snapshot.mode.InitialSnapshotter
io.debezium.connector.oracle.snapshot.mode.InitialOnlySnapshotter
io.debezium.connector.oracle.snapshot.mode.SchemaOnlySnapshotter
io.debezium.connector.oracle.snapshot.mode.SchemaRecoveryOnlySnapshotter
io.debezium.connector.oracle.snapshot.mode.SchemaOnlyRecoverySnapshotter
io.debezium.connector.oracle.snapshot.mode.WhenNeededSnapshotter

View File

@ -39,12 +39,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
hasState = offsetContextExists;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -54,7 +49,7 @@ public boolean shouldStream() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}

View File

@ -138,6 +138,8 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
beanRegistryJdbcConnection.username(), e);
}
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotter);
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
// Print out the server information

View File

@ -60,13 +60,26 @@ public PostgresSnapshotChangeEventSource(PostgresConnectorConfig connectorConfig
@Override
public SnapshottingTask getSnapshottingTask(PostgresPartition partition, PostgresOffsetContext previousOffset) {
//TODO review log messages
boolean snapshotSchema = true;
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));
boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshot();
boolean offsetExists = previousOffset != null;
boolean snapshotInProgress = false;
if (!offsetExists) {
LOGGER.info("No previous offset found");
}
if (offsetExists && !previousOffset.isSnapshotRunning()) {
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
snapshotInProgress = true;
}
boolean snapshotData = snapshotterService.getSnapshotter().shouldSnapshot(offsetExists, snapshotInProgress);
if (snapshotData) {
LOGGER.info("According to the connector configuration data will be snapshotted");
}

View File

@ -829,17 +829,22 @@ public TableId createTableId(String databaseName, String schemaName, String tabl
return new TableId(null, schemaName, tableName);
}
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) throws SQLException {
public boolean validateLogPosition(OffsetContext offset, CommonConnectorConfig config) {
final Lsn storedLsn = ((PostgresOffsetContext) offset).lastCommitLsn();
final String slotName = ((PostgresConnectorConfig) config).slotName();
final String postgresPluginName = ((PostgresConnectorConfig) config).plugin().getPostgresPluginName();
SlotState slotState = getReplicationSlotState(slotName, postgresPluginName);
if (slotState == null) {
return false;
try {
SlotState slotState = getReplicationSlotState(slotName, postgresPluginName);
if (slotState == null) {
return false;
}
return storedLsn == null || slotState.slotRestartLsn().compareTo(storedLsn) < 0;
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
return slotState.slotRestartLsn().compareTo(storedLsn) < 0;
}
@FunctionalInterface

View File

@ -10,16 +10,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class AlwaysSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AlwaysSnapshotter.class);
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue();
return "always";
}
@Override
@ -28,18 +27,19 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
// for ALWAYS snapshot mode don't use exiting offset to have up-to-date SCN
LOGGER.info("Snapshot mode is set to ALWAYS, not checking exiting offset.");
return true;
}
@Override
public boolean shouldSnapshot() {
LOGGER.info("Taking a new snapshot as per configuration");
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
@Override
public boolean shouldStream() {
return true;
}
@ -52,9 +52,4 @@ public boolean shouldSnapshotOnSchemaError() {
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
}

View File

@ -5,15 +5,10 @@
*/
package io.debezium.connector.postgresql.snapshot.mode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
public class InitialOnlySnapshotter extends InitialSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialOnlySnapshotter.class);
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue();
@ -24,20 +19,4 @@ public boolean shouldStream() {
return false;
}
@Override
public boolean shouldSnapshot() {
if (!offsetContextExists) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (isSnapshotInProgress) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info("Previous initial snapshot completed, no snapshot will be performed");
return false;
}
}
}

View File

@ -17,9 +17,6 @@ public class InitialSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(InitialSnapshotter.class);
protected boolean offsetContextExists;
protected boolean isSnapshotInProgress;
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.INITIAL.getValue();
@ -30,25 +27,19 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
this.offsetContextExists = offsetContextExists;
this.isSnapshotInProgress = isSnapshotInProgress;
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
if (!offsetContextExists) {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
if (!offsetExists) {
LOGGER.info("Taking initial snapshot for new datasource");
return true;
}
else if (isSnapshotInProgress) {
else if (snapshotInProgress) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
@ -70,7 +61,7 @@ public boolean shouldSnapshotOnDataError() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
}

View File

@ -7,17 +7,11 @@
import java.util.Map;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.spi.snapshot.Snapshotter;
public class NeverSnapshotter implements Snapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(NeverSnapshotter.class);
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.NEVER.getValue();
@ -28,27 +22,18 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
if (offsetContextExists && isSnapshotInProgress) {
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
LOGGER.error(msg);
throw new ConnectException(msg);
}
else {
LOGGER.info("Snapshots are not allowed as per configuration, starting streaming logical changes only");
}
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
@Override
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}
@ -61,9 +46,4 @@ public boolean shouldSnapshotOnSchemaError() {
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
}

View File

@ -5,28 +5,14 @@
*/
package io.debezium.connector.postgresql.snapshot.mode;
import java.sql.SQLException;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.bean.StandardBeanNames;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.snapshot.mode.BeanAwareSnapshotter;
import io.debezium.spi.snapshot.Snapshotter;
public class WhenNeededSnapshotter extends BeanAwareSnapshotter implements Snapshotter {
private static final Logger LOGGER = LoggerFactory.getLogger(WhenNeededSnapshotter.class);
@Override
public String name() {
return PostgresConnectorConfig.SnapshotMode.WHEN_NEEDED.getValue();
@ -38,35 +24,12 @@ public void configure(Map<String, ?> properties) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
final CommonConnectorConfig config = beanRegistry.lookupByName(StandardBeanNames.CONNECTOR_CONFIG, CommonConnectorConfig.class);
final JdbcConnection connection = beanRegistry.lookupByName(StandardBeanNames.JDBC_CONNECTION, JdbcConnection.class);
final Offsets<Partition, OffsetContext> offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final OffsetContext offset = offsets.getTheOnlyOffset();
try {
if (offsetContextExists && !isSnapshotInProgress) {
// Check to see if the server still has those log coordinates ...
if (!connection.isLogPositionAvailable(offset, config)) {
LOGGER.warn(
"The connector is trying to read log starting at '{}', but this is no longer available on the server. Forcing the snapshot execution as it is allowed by the configuration.",
getStoredLogPosition(offset));
offsets.resetOffset(offsets.getTheOnlyPartition()); // TODO DBZ-7308 evaluate if side effect can be extracted
}
}
}
catch (SQLException e) {
throw new DebeziumException("Unable to get last available log position", e);
}
}
private Object getStoredLogPosition(OffsetContext offset) {
return ((PostgresOffsetContext) offset).lastCommitLsn();
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -75,11 +38,6 @@ public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshotSchema() {
return true;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;

View File

@ -28,38 +28,4 @@ public Optional<String> snapshotQuery(String tableId, List<String> snapshotSelec
return Optional.empty();
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
return true;
}
@Override
public boolean shouldSnapshotOnSchemaError() {
return false;
}
@Override
public boolean shouldSnapshotOnDataError() {
return false;
}
@Override
public boolean shouldSnapshotSchema() {
return false;
}
@Override
public boolean shouldStreamEventsStartingFromSnapshot() {
return false;
}
}

View File

@ -16,18 +16,13 @@ public String name() {
return CustomStartFromStreamingTestSnapshot.class.getName();
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
}
@Override
public boolean shouldStream() {
return true;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -42,7 +37,7 @@ public boolean shouldSnapshotOnDataError() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}

View File

@ -41,12 +41,7 @@ public void injectBeanRegistry(BeanRegistry beanRegistry) {
}
@Override
public void validate(boolean offsetContextExists, boolean isSnapshotInProgress) {
hasState = offsetContextExists;
}
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
return true;
}
@ -56,7 +51,7 @@ public boolean shouldStream() {
}
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
return false;
}

View File

@ -29,11 +29,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.annotation.SingleThreadAccess;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
@ -41,8 +43,11 @@
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotQueryProvider;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Metronome;
@ -61,6 +66,80 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;
protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcConnection jdbcConnection, Offsets<P, O> previousOffsets, DatabaseSchema schema,
Snapshotter snapshotter) {
final OffsetContext offset = previousOffsets.getTheOnlyOffset();
final Partition partition = previousOffsets.getTheOnlyPartition();
if (offset == null) {
if (snapshotter.shouldSnapshotOnSchemaError()) {
// We are in schema only recovery mode, use the existing redo log position
// would like to also verify redo log position exists, but it defaults to 0 which is technically valid
throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
}
LOGGER.info("Connector started for the first time.");
if (schema.isHistorized()) {
((HistorizedDatabaseSchema) schema).initializeStorage();
}
return;
}
if (offset.isSnapshotRunning()) {
// The last offset was an incomplete snapshot and now the snapshot was disabled
if (!snapshotter.shouldSnapshot(true, true)) {
// No snapshots are allowed
throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
}
}
else {
boolean logPositionAvailable = jdbcConnection.isLogPositionAvailable(offset, config);
if (schema.isHistorized() && !((HistorizedDatabaseSchema) schema).historyExists()) {
LOGGER.warn("Database schema history was not found but was expected");
if (snapshotter.shouldSnapshotOnSchemaError()) {
LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. " +
"Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.",
snapshotter.name());
if (schema.isHistorized()) {
((HistorizedDatabaseSchema) schema).initializeStorage();
}
return;
}
else {
throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to schema_only_recovery.");
}
}
if (!logPositionAvailable && !offset.isSnapshotRunning()) {
LOGGER.warn("Last recorded offset is no longer available on the server.");
if (snapshotter.shouldSnapshotOnDataError()) {
LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. " +
"Attempting to snapshot data to fill the gap.",
snapshotter.name());
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
return;
}
LOGGER.warn("The connector is trying to read redo log starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. " +
"If not the connector will streaming from the last available position in the log");
}
if (schema.isHistorized()) {
((HistorizedDatabaseSchema) schema).recover(partition, offset);
}
}
}
public enum State {
RESTARTING,
RUNNING,

View File

@ -169,7 +169,7 @@ public interface LogPositionValidator {
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config) throws SQLException;
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
}
/**
@ -1654,7 +1654,7 @@ protected Map<String, Object> reselectColumns(String query, TableId tableId, Lis
return results;
}
public boolean isLogPositionAvailable(OffsetContext offsetContext, CommonConnectorConfig config) throws SQLException {
public boolean isLogPositionAvailable(OffsetContext offsetContext, CommonConnectorConfig config) {
if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");

View File

@ -155,4 +155,11 @@ public boolean skipSchemaChangeEvent(SchemaChangeEvent event) {
}
return false;
}
/**
* Return true if the database schema history entity exists
*/
public boolean historyExists() {
return schemaHistory.exists();
}
}

View File

@ -51,4 +51,6 @@ default void recover(Partition partition, OffsetContext offset) {
boolean storeOnlyCapturedTables();
boolean storeOnlyCapturedDatabases();
boolean historyExists();
}

View File

@ -22,7 +22,7 @@ public abstract class HistorizedSnapshotter extends BeanAwareSnapshotter impleme
"Schema will %s if shouldSnapshotOnSchemaError is true for the current snapshot mode.";
@Override
public boolean shouldSnapshot() {
public boolean shouldSnapshot(boolean offsetExists, boolean snapshotInProgress) {
final Offsets offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
OffsetContext previousOffset = offsets.getTheOnlyOffset();
@ -42,7 +42,7 @@ public boolean shouldSnapshot() {
protected abstract boolean shouldSnapshotSchemaWhenNoOffset();
@Override
public boolean shouldSnapshotSchema() {
public boolean shouldSnapshotSchema(boolean offsetExists, boolean snapshotInProgress) {
final Offsets offsets = beanRegistry.lookupByName(StandardBeanNames.OFFSETS, Offsets.class);
final HistorizedRelationalDatabaseSchema databaseSchema = beanRegistry.lookupByName(StandardBeanNames.DATABASE_SCHEMA, HistorizedRelationalDatabaseSchema.class);