DBZ-7717 Move LogPositionValidator outside JdbcConnection

This commit is contained in:
mfvitale 2024-03-28 14:12:35 +01:00 committed by Jiri Pechanec
parent c8bc496d63
commit ec3c68b0d8
11 changed files with 37 additions and 37 deletions

View File

@ -136,7 +136,7 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
MySqlOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadSchemaHistory(connectorConfig, connection, previousOffsets, schema, snapshotter);
validateAndLoadSchemaHistory(connectorConfig, connection::validateLogPosition, previousOffsets, schema, snapshotter);
LOGGER.info("Reconnecting after finishing schema recovery");

View File

@ -61,7 +61,6 @@ public AbstractConnectorConnection(ConnectionConfiguration configuration, MySqlF
super(configuration.config(), configuration.factory(), QUOTED_CHARACTER, QUOTED_CHARACTER);
this.connectionConfig = configuration;
this.fieldReader = fieldReader;
this.logPositionValidator = this::validateLogPosition;
}
@Override

View File

@ -97,7 +97,6 @@ public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFa
public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFactory, boolean showVersion) {
super(config, connectionFactory, QUOTED_CHARACTER, QUOTED_CHARACTER);
LOGGER.trace("JDBC connection string: " + connectionString(config));
this.logPositionValidator = this::validateLogPosition;
this.databaseVersion = resolveOracleDatabaseVersion();
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());
@ -107,7 +106,6 @@ public OracleConnection(JdbcConfiguration config, ConnectionFactory connectionFa
public OracleConnection(JdbcConfiguration config, boolean showVersion) {
super(config, resolveConnectionFactory(config), QUOTED_CHARACTER, QUOTED_CHARACTER);
LOGGER.trace("JDBC connection string: " + connectionString(config));
this.logPositionValidator = this::validateLogPosition;
this.databaseVersion = resolveOracleDatabaseVersion();
if (showVersion) {
LOGGER.info("Database Version: {}", databaseVersion.getBanner());

View File

@ -98,7 +98,7 @@ public ChangeEventSourceCoordinator<OraclePartition, OracleOffsetContext> start(
OracleOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotterService.getSnapshotter());
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection::validateLogPosition, previousOffsets, schema, snapshotterService.getSnapshotter());
taskContext = new OracleTaskContext(connectorConfig, schema);

View File

@ -135,7 +135,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
beanRegistryJdbcConnection.username(), e);
}
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection, previousOffsets, schema, snapshotter);
validateAndLoadSchemaHistory(connectorConfig, jdbcConnection::validateLogPosition, previousOffsets, schema, snapshotter);
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {

View File

@ -105,8 +105,6 @@ public class PostgresConnection extends JdbcConnection {
public PostgresConnection(JdbcConfiguration config, PostgresValueConverterBuilder valueConverterBuilder, String connectionUsage) {
super(addDefaultSettings(config, connectionUsage), FACTORY, PostgresConnection::validateServerVersion, "\"", "\"");
this.logPositionValidator = this::validateLogPosition;
if (Objects.isNull(valueConverterBuilder)) {
this.typeRegistry = null;
this.defaultValueConverter = null;
@ -131,7 +129,6 @@ public PostgresConnection(PostgresConnectorConfig config, TypeRegistry typeRegis
PostgresConnection::validateServerVersion,
"\"", "\"");
this.logPositionValidator = this::validateLogPosition;
if (Objects.isNull(typeRegistry)) {
this.typeRegistry = null;
this.defaultValueConverter = null;

View File

@ -140,7 +140,6 @@ public SqlServerConnection(SqlServerConnectorConfig config, SqlServerValueConver
boolean useSingleDatabase) {
super(config.getJdbcConfig(), createConnectionFactory(config.getJdbcConfig(), useSingleDatabase), OPENING_QUOTING_CHARACTER, CLOSING_QUOTING_CHARACTER);
this.logPositionValidator = this::validateLogPosition;
defaultValueConverter = new SqlServerDefaultValueConverter(this::connection, valueConverters);
this.queryFetchSize = config.getQueryFetchSize();

View File

@ -105,7 +105,7 @@ public ChangeEventSourceCoordinator<SqlServerPartition, SqlServerOffsetContext>
final SnapshotterService snapshotterService = connectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class);
validateAndLoadSchemaHistory(connectorConfig, metadataConnection, offsets, schema,
validateAndLoadSchemaHistory(connectorConfig, metadataConnection::validateLogPosition, offsets, schema,
snapshotterService.getSnapshotter());
taskContext = new SqlServerTaskContext(connectorConfig, schema);

View File

@ -35,7 +35,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.function.LogPositionValidator;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
@ -68,7 +68,8 @@ public abstract class BaseSourceTask<P extends Partition, O extends OffsetContex
private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
private Configuration config;
protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcConnection jdbcConnection, Offsets<P, O> previousOffsets, DatabaseSchema schema,
protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, LogPositionValidator logPositionValidator, Offsets<P, O> previousOffsets,
DatabaseSchema schema,
Snapshotter snapshotter) {
for (Map.Entry<P, O> previousOffset : previousOffsets) {
@ -99,7 +100,7 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcCo
}
else {
boolean logPositionAvailable = jdbcConnection.isLogPositionAvailable(offset, config);
boolean logPositionAvailable = isLogPositionAvailable(logPositionValidator, offset, config);
if (schema.isHistorized() && !((HistorizedDatabaseSchema) schema).historyExists()) {
@ -146,6 +147,15 @@ protected void validateAndLoadSchemaHistory(CommonConnectorConfig config, JdbcCo
}
}
public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, OffsetContext offsetContext, CommonConnectorConfig config) {
if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
return true;
}
return logPositionValidator.validate(offsetContext, config);
}
public enum State {
RESTARTING,
RUNNING,

View File

@ -0,0 +1,20 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.function;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.pipeline.spi.OffsetContext;
@FunctionalInterface
public interface LogPositionValidator {
/**
* Validate the stored offset with the position available in the db log.
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
}

View File

@ -58,7 +58,6 @@
import io.debezium.config.Field;
import io.debezium.pipeline.source.snapshot.incremental.ChunkQueryBuilder;
import io.debezium.pipeline.source.snapshot.incremental.DefaultChunkQueryBuilder;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Attribute;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
@ -161,17 +160,6 @@ public interface ResultSetExtractor<T> {
T apply(ResultSet rs) throws SQLException;
}
@FunctionalInterface
public interface LogPositionValidator {
/**
* Validate the stored offset with the position available in the db log.
* @param offsetContext The current stored offset.
* @param config Connector configuration.
*/
boolean validate(OffsetContext offsetContext, CommonConnectorConfig config);
}
/**
* Create a {@link ConnectionFactory} that replaces variables in the supplied URL pattern. Variables include:
* <ul>
@ -340,8 +328,6 @@ private static String findAndReplace(String url, String name, Properties props,
private final String closingQuoteCharacter;
private volatile Connection conn;
protected LogPositionValidator logPositionValidator;
/**
* Create a new instance with the given configuration and connection factory.
*
@ -1653,13 +1639,4 @@ protected Map<String, Object> reselectColumns(String query, TableId tableId, Lis
});
return results;
}
public boolean isLogPositionAvailable(OffsetContext offsetContext, CommonConnectorConfig config) {
if (logPositionValidator == null) {
LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
return true;
}
return logPositionValidator.validate(offsetContext, config);
}
}