DBZ-7765 Add property to eventually disable log position validation on startup

This commit is contained in:
mfvitale 2024-04-10 11:26:58 +02:00 committed by Jiri Pechanec
parent 717888f6f6
commit 6f0a56fc6f
2 changed files with 37 additions and 17 deletions

View File

@ -79,6 +79,7 @@ public abstract class CommonConnectorConfig {
protected final boolean snapshotModeConfigurationBasedStream;
protected final boolean snapshotModeConfigurationBasedSnapshotOnSchemaError;
protected final boolean snapshotModeConfigurationBasedSnapshotOnDataError;
protected final boolean isLogPositionCheckEnabled;
/**
* The set of predefined versions e.g. for source struct maker version
@ -1043,6 +1044,16 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
.withDescription(
"When 'snapshot.mode' is set as configuration_based, this setting permits to specify whenever the data should be snapshotted or not in case of error.");
public static final Field LOG_POSITION_CHECK_ENABLED = Field.create("log.position.check.enable")
.withDisplayName("Enable/Disable log position check")
.withType(Type.BOOLEAN)
.withDefault(true)
.withGroup(Field.createGroupEntry(Field.Group.ADVANCED, 30))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.optional()
.withDescription("When enabled the connector checks if the position stored in the offset is still available in the log");
protected static final ConfigDefinition CONFIG_DEFINITION = ConfigDefinition.editor()
.connector(
EVENT_PROCESSING_FAILURE_HANDLING_MODE,
@ -1065,7 +1076,8 @@ public static SnapshotQueryMode parse(String value, String defaultValue) {
RETRIABLE_RESTART_WAIT,
QUERY_FETCH_SIZE,
MAX_RETRIES_ON_ERROR,
INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY)
INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY,
LOG_POSITION_CHECK_ENABLED)
.events(
CUSTOM_CONVERTERS,
CUSTOM_POST_PROCESSORS,
@ -1173,6 +1185,7 @@ protected CommonConnectorConfig(Configuration config, int defaultSnapshotFetchSi
this.snapshotModeConfigurationBasedStream = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM);
this.snapshotModeConfigurationBasedSnapshotOnSchemaError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_SCHEMA_ERROR);
this.snapshotModeConfigurationBasedSnapshotOnDataError = config.getBoolean(SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_ON_DATA_ERROR);
this.isLogPositionCheckEnabled = config.getBoolean(LOG_POSITION_CHECK_ENABLED);
this.signalingDataCollectionId = !Strings.isNullOrBlank(this.signalingDataCollection)
? TableId.parse(this.signalingDataCollection)
@ -1528,6 +1541,10 @@ public boolean snapshotModeConfigurationBasedSnapshotOnDataError() {
return this.snapshotModeConfigurationBasedSnapshotOnDataError;
}
public boolean isLogPositionCheckEnabled() {
return isLogPositionCheckEnabled;
}
public EnumeratedValue snapshotQueryMode() {
return this.snapshotQueryMode;
}

View File

@ -100,8 +100,6 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPos
}
else {
boolean logPositionAvailable = isLogPositionAvailable(logPositionValidator, offset, config);
if (schema.isHistorized() && !((HistorizedDatabaseSchema) schema).historyExists()) {
LOGGER.warn("Database schema history was not found but was expected");
@ -121,23 +119,28 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPos
}
}
if (!logPositionAvailable && !offset.isSnapshotRunning()) {
LOGGER.warn("Last recorded offset is no longer available on the server.");
if (config.isLogPositionCheckEnabled()) {
if (snapshotter.shouldSnapshotOnDataError()) {
boolean logPositionAvailable = isLogPositionAvailable(logPositionValidator, partition, offset, config);
LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. " +
"Attempting to snapshot data to fill the gap.",
snapshotter.name());
if (!logPositionAvailable && !offset.isSnapshotRunning()) {
LOGGER.warn("Last recorded offset is no longer available on the server.");
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
if (snapshotter.shouldSnapshotOnDataError()) {
return;
LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. " +
"Attempting to snapshot data to fill the gap.",
snapshotter.name());
previousOffsets.resetOffset(previousOffsets.getTheOnlyPartition());
return;
}
LOGGER.warn("The connector is trying to read redo log starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. " +
"If not the connector will streaming from the last available position in the log");
}
LOGGER.warn("The connector is trying to read redo log starting at " + offset + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. " +
"If not the connector will streaming from the last available position in the log");
}
if (schema.isHistorized()) {
@ -147,13 +150,13 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPos
}
}
public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, OffsetContext offsetContext, CommonConnectorConfig config) {
public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, Partition partition, OffsetContext offsetContext, CommonConnectorConfig config) {
if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
return true;
}
return logPositionValidator.validate(offsetContext, config);
return logPositionValidator.validate(partition, offsetContext, config);
}
public enum State {