DBZ-7783 Add BinlogCharsetRegistry to the service registry

This commit is contained in:
Chris Cranford 2024-06-17 15:16:55 -04:00 committed by Jiri Pechanec
parent 6b9a49cbaa
commit 08c7977cb7
13 changed files with 74 additions and 32 deletions

View File

@ -19,7 +19,6 @@
import io.debezium.config.EnumeratedValue; import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field; import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput; import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSet; import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.gtid.GtidSetFactory; import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
@ -895,11 +894,6 @@ public boolean isTimeAdjustedEnabled() {
*/ */
public abstract GtidSetFactory getGtidSetFactory(); public abstract GtidSetFactory getGtidSetFactory();
/**
* @return the character set registry
*/
public abstract BinlogCharsetRegistry getCharsetRegistry();
/** /**
* Check whether tests request global lock usage. * Check whether tests request global lock usage.
* *

View File

@ -5,12 +5,14 @@
*/ */
package io.debezium.connector.binlog.charset; package io.debezium.connector.binlog.charset;
import io.debezium.service.Service;
/** /**
* Contract for handling binlog-based character set features * Contract for handling binlog-based character set features
* *
* @author Chris Cranford * @author Chris Cranford
*/ */
public interface BinlogCharsetRegistry { public interface BinlogCharsetRegistry extends Service {
/** /**
* Get the size of the character set registry map. * Get the size of the character set registry map.
* @return the map size * @return the map size

View File

@ -144,6 +144,6 @@ protected boolean hasValueConverter(Column column, TableId tableId) {
} }
protected BinlogCharsetRegistry getCharsetRegistry() { protected BinlogCharsetRegistry getCharsetRegistry() {
return connectorConfig.getCharsetRegistry(); return connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class);
} }
} }

View File

@ -22,9 +22,8 @@
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSetFactory; import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry; import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistryServiceProvider;
import io.debezium.connector.mariadb.gtid.MariaDbGtidSetFactory; import io.debezium.connector.mariadb.gtid.MariaDbGtidSetFactory;
import io.debezium.connector.mariadb.history.MariaDbHistoryRecordComparator; import io.debezium.connector.mariadb.history.MariaDbHistoryRecordComparator;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
@ -198,12 +197,10 @@ protected static ConfigDef configDef() {
private final Predicate<String> gtidSourceFilter; private final Predicate<String> gtidSourceFilter;
private final SnapshotLockingMode snapshotLockingMode; private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotLockingStrategy snapshotLockingStrategy; private final SnapshotLockingStrategy snapshotLockingStrategy;
private final MariaDbCharsetRegistry charsetRegistry;
public MariaDbConnectorConfig(Configuration config) { public MariaDbConnectorConfig(Configuration config) {
super(MariaDbConnector.class, config, DEFAULT_NON_STREAMING_FETCH_SIZE); super(MariaDbConnector.class, config, DEFAULT_NON_STREAMING_FETCH_SIZE);
this.gtidSetFactory = new MariaDbGtidSetFactory(); this.gtidSetFactory = new MariaDbGtidSetFactory();
this.charsetRegistry = new MariaDbCharsetRegistry();
final String gtidIncludes = config.getString(GTID_SOURCE_INCLUDES); final String gtidIncludes = config.getString(GTID_SOURCE_INCLUDES);
final String gtidExcludes = config.getString(GTID_SOURCE_EXCLUDES); final String gtidExcludes = config.getString(GTID_SOURCE_EXCLUDES);
@ -212,6 +209,8 @@ public MariaDbConnectorConfig(Configuration config) {
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE)); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE));
this.snapshotLockingStrategy = new MariaDbSnapshotLockingStrategy(snapshotLockingMode); this.snapshotLockingStrategy = new MariaDbSnapshotLockingStrategy(snapshotLockingMode);
getServiceRegistry().registerServiceProvider(new MariaDbCharsetRegistryServiceProvider());
} }
@Override @Override
@ -254,11 +253,6 @@ public Optional<SnapshotLockingMode> getSnapshotLockingMode() {
return Optional.of(snapshotLockingMode); return Optional.of(snapshotLockingMode);
} }
@Override
public BinlogCharsetRegistry getCharsetRegistry() {
return charsetRegistry;
}
/** /**
* Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MariaDB. * Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MariaDB.
*/ */

View File

@ -23,6 +23,7 @@
import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.BinlogEventMetadataProvider; import io.debezium.connector.binlog.BinlogEventMetadataProvider;
import io.debezium.connector.binlog.BinlogSourceTask; import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader; import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry; import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
@ -288,7 +289,7 @@ private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig connect
connectorConfig.binaryHandlingMode(), connectorConfig.binaryHandlingMode(),
connectorConfig.isTimeAdjustedEnabled() ? MariaDbValueConverters::adjustTemporal : x -> x, connectorConfig.isTimeAdjustedEnabled() ? MariaDbValueConverters::adjustTemporal : x -> x,
connectorConfig.getEventConvertingFailureHandlingMode(), connectorConfig.getEventConvertingFailureHandlingMode(),
connectorConfig.getCharsetRegistry()); connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
} }
private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig) { private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig) {

View File

@ -7,6 +7,7 @@
import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchema; import io.debezium.connector.binlog.BinlogDatabaseSchema;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser; import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter; import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
@ -41,7 +42,7 @@ protected DdlParser createDdlParser(BinlogConnectorConfig connectorConfig, Maria
connectorConfig.isSchemaChangesHistoryEnabled(), connectorConfig.isSchemaChangesHistoryEnabled(),
valueConverter, valueConverter,
getTableFilter(), getTableFilter(),
connectorConfig.getCharsetRegistry()); connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
} }
} }

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb.charset;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
/**
* @author Chris Cranford
*/
public class MariaDbCharsetRegistryServiceProvider implements ServiceProvider<BinlogCharsetRegistry> {
@Override
public Class<BinlogCharsetRegistry> getServiceClass() {
return BinlogCharsetRegistry.class;
}
@Override
public BinlogCharsetRegistry createService(Configuration configuration, ServiceRegistry serviceRegistry) {
return new MariaDbCharsetRegistry();
}
}

View File

@ -12,6 +12,7 @@
import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogDatabaseSchemaTest; import io.debezium.connector.binlog.BinlogDatabaseSchemaTest;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.jdbc.TemporalPrecisionMode;
@ -41,7 +42,7 @@ protected MariaDbDatabaseSchema getSchema(Configuration config) {
CommonConnectorConfig.BinaryHandlingMode.BYTES, CommonConnectorConfig.BinaryHandlingMode.BYTES,
MariaDbValueConverters::adjustTemporal, MariaDbValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getCharsetRegistry()); connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
return new MariaDbDatabaseSchema( return new MariaDbDatabaseSchema(
connectorConfig, connectorConfig,

View File

@ -24,9 +24,8 @@
import io.debezium.connector.AbstractSourceInfo; import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker; import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSetFactory; import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry; import io.debezium.connector.mysql.charset.MySqlCharsetRegistryServiceProvider;
import io.debezium.connector.mysql.gtid.MySqlGtidSetFactory; import io.debezium.connector.mysql.gtid.MySqlGtidSetFactory;
import io.debezium.connector.mysql.history.MySqlHistoryRecordComparator; import io.debezium.connector.mysql.history.MySqlHistoryRecordComparator;
import io.debezium.function.Predicates; import io.debezium.function.Predicates;
@ -231,12 +230,10 @@ protected static ConfigDef configDef() {
private final Predicate<String> gtidSourceFilter; private final Predicate<String> gtidSourceFilter;
private final SnapshotLockingMode snapshotLockingMode; private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotLockingStrategy snapshotLockingStrategy; private final SnapshotLockingStrategy snapshotLockingStrategy;
private final MySqlCharsetRegistry charsetRegistry;
public MySqlConnectorConfig(Configuration config) { public MySqlConnectorConfig(Configuration config) {
super(MySqlConnector.class, config, DEFAULT_SNAPSHOT_FETCH_SIZE); super(MySqlConnector.class, config, DEFAULT_SNAPSHOT_FETCH_SIZE);
this.gtidSetFactory = new MySqlGtidSetFactory(); this.gtidSetFactory = new MySqlGtidSetFactory();
this.charsetRegistry = new MySqlCharsetRegistry();
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString()); this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.snapshotLockingStrategy = new MySqlSnapshotLockingStrategy(snapshotLockingMode); this.snapshotLockingStrategy = new MySqlSnapshotLockingStrategy(snapshotLockingMode);
@ -246,6 +243,8 @@ public MySqlConnectorConfig(Configuration config) {
final String gtidSetExcludes = config.getString(GTID_SOURCE_EXCLUDES); final String gtidSetExcludes = config.getString(GTID_SOURCE_EXCLUDES);
this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes) this.gtidSourceFilter = gtidSetIncludes != null ? Predicates.includesUuids(gtidSetIncludes)
: (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null); : (gtidSetExcludes != null ? Predicates.excludesUuids(gtidSetExcludes) : null);
getServiceRegistry().registerServiceProvider(new MySqlCharsetRegistryServiceProvider());
} }
public Optional<SnapshotLockingMode> getSnapshotLockingMode() { public Optional<SnapshotLockingMode> getSnapshotLockingMode() {
@ -287,11 +286,6 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter, getGtidSetFactory()); return new MySqlHistoryRecordComparator(gtidSourceFilter, getGtidSetFactory());
} }
@Override
public BinlogCharsetRegistry getCharsetRegistry() {
return charsetRegistry;
}
/** /**
* Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MySQL. * Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MySQL.
*/ */

View File

@ -21,6 +21,7 @@
import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.BinlogEventMetadataProvider; import io.debezium.connector.binlog.BinlogEventMetadataProvider;
import io.debezium.connector.binlog.BinlogSourceTask; import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.mysql.jdbc.MySqlConnection; import io.debezium.connector.mysql.jdbc.MySqlConnection;
import io.debezium.connector.mysql.jdbc.MySqlConnectionConfiguration; import io.debezium.connector.mysql.jdbc.MySqlConnectionConfiguration;
@ -234,7 +235,7 @@ private MySqlValueConverters getValueConverters(MySqlConnectorConfig configurati
configuration.binaryHandlingMode(), configuration.binaryHandlingMode(),
configuration.isTimeAdjustedEnabled() ? MySqlValueConverters::adjustTemporal : x -> x, configuration.isTimeAdjustedEnabled() ? MySqlValueConverters::adjustTemporal : x -> x,
configuration.getEventConvertingFailureHandlingMode(), configuration.getEventConvertingFailureHandlingMode(),
configuration.getCharsetRegistry()); configuration.getServiceRegistry().getService(BinlogCharsetRegistry.class));
} }
@Override @Override

View File

@ -10,6 +10,7 @@
import io.debezium.annotation.NotThreadSafe; import io.debezium.annotation.NotThreadSafe;
import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchema; import io.debezium.connector.binlog.BinlogDatabaseSchema;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter; import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters; import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
@ -53,7 +54,7 @@ protected DdlParser createDdlParser(BinlogConnectorConfig connectorConfig, MySql
connectorConfig.isSchemaCommentsHistoryEnabled(), connectorConfig.isSchemaCommentsHistoryEnabled(),
valueConverter, valueConverter,
getTableFilter(), getTableFilter(),
connectorConfig.getCharsetRegistry()); connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
} }
} }

View File

@ -0,0 +1,26 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.charset;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.service.spi.ServiceProvider;
import io.debezium.service.spi.ServiceRegistry;
/**
* @author Chris Cranford
*/
public class MySqlCharsetRegistryServiceProvider implements ServiceProvider<BinlogCharsetRegistry> {
@Override
public Class<BinlogCharsetRegistry> getServiceClass() {
return BinlogCharsetRegistry.class;
}
@Override
public BinlogCharsetRegistry createService(Configuration configuration, ServiceRegistry serviceRegistry) {
return new MySqlCharsetRegistry();
}
}

View File

@ -13,6 +13,7 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogDatabaseSchemaTest; import io.debezium.connector.binlog.BinlogDatabaseSchemaTest;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters; import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
@ -43,7 +44,7 @@ protected MySqlDatabaseSchema getSchema(Configuration config) {
BinaryHandlingMode.BYTES, BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal, MySqlValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getCharsetRegistry()); connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
return new MySqlDatabaseSchema( return new MySqlDatabaseSchema(
connectorConfig, connectorConfig,