DBZ-7783 Introduce BinlogCharsetRegistry contract

This commit is contained in:
Chris Cranford 2024-06-13 16:30:29 -04:00 committed by Jiri Pechanec
parent 09de996b64
commit 14ec7d6477
29 changed files with 256 additions and 122 deletions

View File

@ -19,6 +19,7 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.config.Field.ValidationOutput;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
@ -894,6 +895,11 @@ public boolean isTimeAdjustedEnabled() {
*/
public abstract GtidSetFactory getGtidSetFactory();
/**
* @return the character set registry
*/
public abstract BinlogCharsetRegistry getCharsetRegistry();
/**
* Check whether tests request global lock usage.
*

View File

@ -0,0 +1,43 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.binlog.charset;
/**
* Contract for handling binlog-based character set features
*
* @author Chris Cranford
*/
public interface BinlogCharsetRegistry {
/**
* Get the size of the character set registry map.
* @return the map size
*/
int getCharsetMapSize();
/**
* Get the collation name for a collation index.
*
* @param collationIndex the collation index, should not be {@code null}
* @return the collation name or {@code null} if not found or index is invalid
*/
String getCollationNameForCollationIndex(Integer collationIndex);
/**
* Get the collation name for the collation index
*
* @param collationIndex the collation index, should not be {@code null}
* @return the collation name or {@code null} if not found or index is invalid
*/
String getCharsetNameForCollationIndex(Integer collationIndex);
/**
* Get the Java encoding for a database character set name.
*
* @param charSetName the database character set name
* @return the java encoding for the character set name
*/
String getJavaEncodingForCharSet(String charSetName);
}

View File

@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
@ -123,7 +124,7 @@ public Object readField(ResultSet rs, int columnIndex, Column column, Table tabl
/**
* Read the column's character set.
*/
protected abstract String getCharacterSet(Column column) throws UnsupportedEncodingException;
protected abstract String getCharacterSet(Column column);
/**
* Common handler for logging invalid values.
@ -141,4 +142,8 @@ protected void logInvalidValue(ResultSet rs, int columnIndex, Object value) thro
protected boolean hasValueConverter(Column column, TableId tableId) {
return connectorConfig.customConverterRegistry().getValueConverter(tableId, column).isPresent();
}
protected BinlogCharsetRegistry getCharsetRegistry() {
return connectorConfig.getCharsetRegistry();
}
}

View File

@ -44,6 +44,7 @@
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogGeometry;
import io.debezium.connector.binlog.BinlogUnsignedIntegerConverter;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.data.Json;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.jdbc.JdbcValueConverters;
@ -94,6 +95,7 @@ public abstract class BinlogValueConverters extends JdbcValueConverters {
private static final Pattern TIMESTAMP_FIELD_PATTERN = Pattern.compile("([0-9]*)-([0-9]*)-([0-9]*) .*");
private final EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode;
private final BinlogCharsetRegistry charsetRegistry;
/**
* Create a new instance of the value converters that always uses UTC for the default time zone when
@ -105,15 +107,18 @@ public abstract class BinlogValueConverters extends JdbcValueConverters {
* @param binaryHandlingMode how binary columns should be treated
* @param adjuster a temporal adjuster to make a database specific time before conversion
* @param eventConvertingFailureHandlingMode how to handle conversion failures
* @param charsetRegistry the character set registry
*/
public BinlogValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, adjuster, bigIntUnsignedMode, binaryHandlingMode);
this.eventConvertingFailureHandlingMode = eventConvertingFailureHandlingMode;
this.charsetRegistry = charsetRegistry;
}
@Override
@ -815,7 +820,9 @@ protected Object convertTimestampToLocalDateTime(Column column, Field fieldDefn,
protected abstract List<String> extractEnumAndSetOptions(Column column);
protected abstract String getJavaEncodingForCharSet(String charSetName);
protected String getJavaEncodingForCharSet(String charSetName) {
return charsetRegistry.getJavaEncodingForCharSet(charSetName);
}
/**
* A utility method that adjusts <a href="https://dev.mysql.com/doc/refman/8.2/en/two-digit-years.html">ambiguous</a> 2-digit

View File

@ -22,7 +22,9 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.gtid.MariaDbGtidSetFactory;
import io.debezium.connector.mariadb.history.MariaDbHistoryRecordComparator;
import io.debezium.function.Predicates;
@ -196,10 +198,12 @@ protected static ConfigDef configDef() {
private final Predicate<String> gtidSourceFilter;
private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotLockingStrategy snapshotLockingStrategy;
private final MariaDbCharsetRegistry charsetRegistry;
public MariaDbConnectorConfig(Configuration config) {
super(MariaDbConnector.class, config, DEFAULT_NON_STREAMING_FETCH_SIZE);
this.gtidSetFactory = new MariaDbGtidSetFactory();
this.charsetRegistry = new MariaDbCharsetRegistry();
final String gtidIncludes = config.getString(GTID_SOURCE_INCLUDES);
final String gtidExcludes = config.getString(GTID_SOURCE_EXCLUDES);
@ -250,6 +254,11 @@ public Optional<SnapshotLockingMode> getSnapshotLockingMode() {
return Optional.of(snapshotLockingMode);
}
@Override
public BinlogCharsetRegistry getCharsetRegistry() {
return charsetRegistry;
}
/**
* Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MariaDB.
*/

View File

@ -25,6 +25,7 @@
import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbConnection;
import io.debezium.connector.mariadb.jdbc.MariaDbConnectionConfiguration;
import io.debezium.connector.mariadb.jdbc.MariaDbFieldReader;
@ -78,6 +79,7 @@ protected Iterable<Field> getAllConfigurationFields() {
@Override
protected ChangeEventSourceCoordinator<MariaDbPartition, MariaDbOffsetContext> start(Configuration configuration) {
final MariaDbCharsetRegistry charsetRegistry = new MariaDbCharsetRegistry();
final Clock clock = Clock.system();
final MariaDbConnectorConfig connectorConfig = new MariaDbConnectorConfig(configuration);
final TopicNamingStrategy<TableId> topicNamingStrategy = connectorConfig.getTopicNamingStrategy(TOPIC_NAMING_STRATEGY);
@ -285,7 +287,8 @@ private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig connect
connectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(),
connectorConfig.binaryHandlingMode(),
connectorConfig.isTimeAdjustedEnabled() ? MariaDbValueConverters::adjustTemporal : x -> x,
connectorConfig.getEventConvertingFailureHandlingMode());
connectorConfig.getEventConvertingFailureHandlingMode(),
connectorConfig.getCharsetRegistry());
}
private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig) {

View File

@ -40,7 +40,8 @@ protected DdlParser createDdlParser(BinlogConnectorConfig connectorConfig, Maria
false,
connectorConfig.isSchemaChangesHistoryEnabled(),
valueConverter,
getTableFilter());
getTableFilter(),
connectorConfig.getCharsetRegistry());
}
}

View File

@ -18,12 +18,13 @@
import org.antlr.v4.runtime.misc.Interval;
import org.antlr.v4.runtime.tree.ParseTree;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.antlr.AntlrDdlParser;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogSystemVariables;
import io.debezium.connector.mariadb.antlr.listener.MariaDbAntlrDdlParserListener;
import io.debezium.connector.mariadb.charset.CharsetMappingResolver;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.ddl.parser.mariadb.generated.MariaDBLexer;
import io.debezium.ddl.parser.mariadb.generated.MariaDBParser;
@ -48,25 +49,30 @@ public class MariaDbAntlrDdlParser extends AntlrDdlParser<MariaDBLexer, MariaDBP
private final ConcurrentHashMap<String, String> charsetNameForDatabase = new ConcurrentHashMap<>();
private final MariaDbValueConverters converters;
private final Tables.TableFilter tableFilter;
private final BinlogCharsetRegistry charsetRegistry;
@VisibleForTesting
public MariaDbAntlrDdlParser() {
this(null, Tables.TableFilter.includeAll());
}
@VisibleForTesting
public MariaDbAntlrDdlParser(MariaDbValueConverters valueConverters) {
this(valueConverters, Tables.TableFilter.includeAll());
}
@VisibleForTesting
public MariaDbAntlrDdlParser(MariaDbValueConverters valueConverters, Tables.TableFilter tableFilter) {
this(true, false, true, valueConverters, tableFilter);
this(true, false, true, valueConverters, tableFilter, null);
}
public MariaDbAntlrDdlParser(boolean throwWerrorsFromTreeWalk, boolean includeViews, boolean includeComments,
MariaDbValueConverters valueConverters, Tables.TableFilter tableFilter) {
MariaDbValueConverters valueConverters, Tables.TableFilter tableFilter, BinlogCharsetRegistry charsetRegistry) {
super(throwWerrorsFromTreeWalk, includeViews, includeComments);
systemVariables = new BinlogSystemVariables();
this.converters = valueConverters;
this.tableFilter = tableFilter;
this.charsetRegistry = charsetRegistry;
}
@Override
@ -228,6 +234,15 @@ protected DataTypeResolver initializeDataTypeResolver() {
return dataTypeResolverBuilder.build();
}
/**
* Get the character set registry.
*
* @return the character set registry
*/
public BinlogCharsetRegistry getCharsetRegistry() {
return charsetRegistry;
}
/**
* Provides a map of default character sets by database/schema name.
*
@ -461,9 +476,9 @@ public String extractCharset(CharsetNameContext charsetNode, CollationNameContex
}
else if (collationNode != null && collationNode.getText() != null) {
final String collationName = withoutQuotes(collationNode.getText()).toLowerCase();
for (int index = 0; index < CharsetMappingResolver.getMapSize(); index++) {
if (collationName.equals(CharsetMappingResolver.getStaticCollationNameForCollationIndex(index))) {
charsetName = CharsetMappingResolver.getStaticMariaDbCharsetNameForCollationIndex(index);
for (int index = 0; index < charsetRegistry.getCharsetMapSize(); index++) {
if (collationName.equals(charsetRegistry.getCollationNameForCollationIndex(index))) {
charsetName = charsetRegistry.getCharsetNameForCollationIndex(index);
break;
}
}

View File

@ -1,48 +0,0 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb.charset;
import com.mysql.cj.CharsetMapping;
/**
* A simple bridge between the MySQL driver and the MariaDB code to resolve charset mappings.
*
* @author Chris Cranford
*/
public class CharsetMappingResolver {
// todo: replace with our own implementation
public static int getMapSize() {
return CharsetMapping.MAP_SIZE;
}
// todo: replace with our own implementation
public static String getStaticCollationNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticCollationNameForCollationIndex(collationIndex);
}
// todo: replace with our own implementation
public static String getStaticMariaDbCharsetNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticMysqlCharsetNameForCollationIndex(collationIndex);
}
// todo: replace with our own implementation
public static String getJavaEncodingForMariaDbCharSet(String charSetName) {
return CharsetMappingWrapper.getJavaEncodingForMariaDbCharSet(charSetName);
}
private CharsetMappingResolver() {
}
/**
* Helper to gain access to the private method.
*/
private final static class CharsetMappingWrapper extends CharsetMapping {
static String getJavaEncodingForMariaDbCharSet(String charSetName) {
return CharsetMapping.getStaticJavaEncodingForMysqlCharset(charSetName);
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb.charset;
import com.mysql.cj.CharsetMapping;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
/**
* A registry that stores character set mappings from {@code charset_mappings.json} for MariaDB.
*
* @author Chris Cranford
*/
public class MariaDbCharsetRegistry implements BinlogCharsetRegistry {
@Override
public int getCharsetMapSize() {
return CharsetMapping.MAP_SIZE;
}
@Override
public String getCollationNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticCollationNameForCollationIndex(collationIndex);
}
@Override
public String getCharsetNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticMysqlCharsetNameForCollationIndex(collationIndex);
}
@Override
public String getJavaEncodingForCharSet(String charSetName) {
return CharsetMappingMapper.getJavaEncodingForCharSet(charSetName);
}
/**
* Helper to gain access to protected method
*/
private final static class CharsetMappingMapper extends CharsetMapping {
static String getJavaEncodingForCharSet(String charSetName) {
return CharsetMapping.getStaticJavaEncodingForMysqlCharset(charSetName);
}
}
}

View File

@ -16,7 +16,6 @@
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.binlog.jdbc.ConnectionConfiguration;
import io.debezium.connector.mariadb.charset.CharsetMappingResolver;
import io.debezium.connector.mariadb.gtid.MariaDbGtidSet;
import io.debezium.connector.mariadb.gtid.MariaDbGtidSet.MariaDbGtid;
@ -104,8 +103,4 @@ public GtidSet filterGtidSet(Predicate<String> gtidSourceFilter, String offsetGt
LOGGER.info("Final merged GTID set to use when connecting to MariaDB: {}", mergedGtidSet);
return mergedGtidSet;
}
public static String getJavaEncodingForCharSet(String charSetName) {
return CharsetMappingResolver.getJavaEncodingForMariaDbCharSet(charSetName);
}
}

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.mariadb.jdbc;
import java.io.UnsupportedEncodingException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Calendar;
@ -13,7 +12,6 @@
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.mariadb.MariaDbConnectorConfig;
import io.debezium.connector.mariadb.charset.CharsetMappingResolver;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
@ -29,8 +27,8 @@ public MariaDbFieldReader(MariaDbConnectorConfig connectorConfig) {
}
@Override
protected String getCharacterSet(Column column) throws UnsupportedEncodingException {
return CharsetMappingResolver.getJavaEncodingForMariaDbCharSet(column.charsetName());
protected String getCharacterSet(Column column) {
return getCharsetRegistry().getJavaEncodingForCharSet(column.charsetName());
}
@Override

View File

@ -11,6 +11,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -43,23 +44,20 @@ public class MariaDbValueConverters extends BinlogValueConverters {
* @param binaryHandlingMode how binary columns should be treated
* @param adjuster a temporal adjuster to make a database specific time before conversion
* @param eventConvertingFailureHandlingMode how to handle conversion failures
* @param charsetRegistry the character set registry
*/
public MariaDbValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode);
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry);
}
@Override
protected List<String> extractEnumAndSetOptions(Column column) {
return MariaDbAntlrDdlParser.extractEnumAndSetOptions(column.enumValues());
}
@Override
protected String getJavaEncodingForCharSet(String charSetName) {
return MariaDbConnection.getJavaEncodingForCharSet(charSetName);
}
}

View File

@ -40,7 +40,8 @@ protected MariaDbDatabaseSchema getSchema(Configuration config) {
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
MariaDbValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getCharsetRegistry());
return new MariaDbDatabaseSchema(
connectorConfig,

View File

@ -10,6 +10,7 @@
import io.debezium.connector.binlog.BinlogDefaultValueTest;
import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
@ -36,7 +37,8 @@ protected MariaDbValueConverters getValueConverter(DecimalMode decimalMode,
bigIntUnsignedMode,
binaryHandlingMode,
x -> x,
EventConvertingFailureHandlingMode.WARN);
EventConvertingFailureHandlingMode.WARN,
new MariaDbCharsetRegistry());
}
@Override

View File

@ -10,6 +10,7 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.binlog.BinlogAntlrDdlParserTest;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
@ -50,7 +51,8 @@ protected MariaDbValueConverters getValueConverters() {
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MariaDbCharsetRegistry());
}
@Override
@ -82,7 +84,7 @@ public MariaDbDdlParserWithSimpleTestListener(DdlChanges listener, boolean inclu
public MariaDbDdlParserWithSimpleTestListener(DdlChanges listener, boolean includeViews, boolean includeComments, Tables.TableFilter tableFilter,
MariaDbValueConverters converters) {
super(false, includeViews, includeComments, converters, tableFilter);
super(false, includeViews, includeComments, converters, tableFilter, new MariaDbCharsetRegistry());
this.ddlChanges = listener;
}
}

View File

@ -12,6 +12,7 @@
import io.debezium.connector.binlog.BinlogValueConvertersTest;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
@ -35,7 +36,8 @@ protected BinlogValueConverters getValueConverters(DecimalMode decimalMode,
bigIntUnsignedMode,
binaryHandlingMode,
temporalAdjuster,
eventConvertingFailureHandlingMode);
eventConvertingFailureHandlingMode,
new MariaDbCharsetRegistry());
}
@Override

View File

@ -24,7 +24,9 @@
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.gtid.GtidSetFactory;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.gtid.MySqlGtidSetFactory;
import io.debezium.connector.mysql.history.MySqlHistoryRecordComparator;
import io.debezium.function.Predicates;
@ -229,10 +231,12 @@ protected static ConfigDef configDef() {
private final Predicate<String> gtidSourceFilter;
private final SnapshotLockingMode snapshotLockingMode;
private final SnapshotLockingStrategy snapshotLockingStrategy;
private final MySqlCharsetRegistry charsetRegistry;
public MySqlConnectorConfig(Configuration config) {
super(MySqlConnector.class, config, DEFAULT_SNAPSHOT_FETCH_SIZE);
this.gtidSetFactory = new MySqlGtidSetFactory();
this.charsetRegistry = new MySqlCharsetRegistry();
this.snapshotLockingMode = SnapshotLockingMode.parse(config.getString(SNAPSHOT_LOCKING_MODE), SNAPSHOT_LOCKING_MODE.defaultValueAsString());
this.snapshotLockingStrategy = new MySqlSnapshotLockingStrategy(snapshotLockingMode);
@ -283,6 +287,11 @@ protected HistoryRecordComparator getHistoryRecordComparator() {
return new MySqlHistoryRecordComparator(gtidSourceFilter, getGtidSetFactory());
}
@Override
public BinlogCharsetRegistry getCharsetRegistry() {
return charsetRegistry;
}
/**
* Custom {@link io.debezium.connector.binlog.BinlogConnectorConfig.SnapshotLockingStrategy} for MySQL.
*/

View File

@ -233,7 +233,8 @@ private MySqlValueConverters getValueConverters(MySqlConnectorConfig configurati
configuration.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(),
configuration.binaryHandlingMode(),
configuration.isTimeAdjustedEnabled() ? MySqlValueConverters::adjustTemporal : x -> x,
configuration.getEventConvertingFailureHandlingMode());
configuration.getEventConvertingFailureHandlingMode(),
configuration.getCharsetRegistry());
}
@Override

View File

@ -52,7 +52,8 @@ protected DdlParser createDdlParser(BinlogConnectorConfig connectorConfig, MySql
false,
connectorConfig.isSchemaCommentsHistoryEnabled(),
valueConverter,
getTableFilter());
getTableFilter(),
connectorConfig.getCharsetRegistry());
}
}

View File

@ -19,12 +19,12 @@
import org.antlr.v4.runtime.misc.Interval;
import org.antlr.v4.runtime.tree.ParseTree;
import com.mysql.cj.CharsetMapping;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.antlr.AntlrDdlParser;
import io.debezium.antlr.AntlrDdlParserListener;
import io.debezium.antlr.DataTypeResolver;
import io.debezium.antlr.DataTypeResolver.DataTypeEntry;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogSystemVariables;
import io.debezium.connector.mysql.antlr.listener.MySqlAntlrDdlParserListener;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
@ -51,25 +51,30 @@ public class MySqlAntlrDdlParser extends AntlrDdlParser<MySqlLexer, MySqlParser>
private final ConcurrentMap<String, String> charsetNameForDatabase = new ConcurrentHashMap<>();
private final MySqlValueConverters converters;
private final TableFilter tableFilter;
private final BinlogCharsetRegistry charsetRegistry;
@VisibleForTesting
public MySqlAntlrDdlParser() {
this(null, TableFilter.includeAll());
}
@VisibleForTesting
public MySqlAntlrDdlParser(MySqlValueConverters converters) {
this(converters, TableFilter.includeAll());
}
@VisibleForTesting
public MySqlAntlrDdlParser(MySqlValueConverters converters, TableFilter tableFilter) {
this(true, false, false, converters, tableFilter);
this(true, false, false, converters, tableFilter, null);
}
public MySqlAntlrDdlParser(boolean throwErrorsFromTreeWalk, boolean includeViews, boolean includeComments,
MySqlValueConverters converters, TableFilter tableFilter) {
MySqlValueConverters converters, TableFilter tableFilter, BinlogCharsetRegistry charsetRegistry) {
super(throwErrorsFromTreeWalk, includeViews, includeComments);
systemVariables = new BinlogSystemVariables();
this.converters = converters;
this.tableFilter = tableFilter;
this.charsetRegistry = charsetRegistry;
}
@Override
@ -456,10 +461,9 @@ public String extractCharset(CharsetNameContext charsetNode, CollationNameContex
}
else if (collationNode != null && collationNode.getText() != null) {
final String collationName = withoutQuotes(collationNode.getText()).toLowerCase();
for (int index = 0; index < CharsetMapping.MAP_SIZE; index++) {
if (collationName.equals(
CharsetMapping.getStaticCollationNameForCollationIndex(index))) {
charsetName = CharsetMapping.getStaticMysqlCharsetNameForCollationIndex(index);
for (int index = 0; index < charsetRegistry.getCharsetMapSize(); index++) {
if (collationName.equals(charsetRegistry.getCollationNameForCollationIndex(index))) {
charsetName = charsetRegistry.getCharsetNameForCollationIndex(index);
break;
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.charset;
import com.mysql.cj.CharsetMapping;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
/**
* Character-set registry implementation for MySQL that delegates to the MySQL JDBC driver.
*
* @author Chris Cranford
*/
public class MySqlCharsetRegistry implements BinlogCharsetRegistry {
@Override
public int getCharsetMapSize() {
return CharsetMapping.MAP_SIZE;
}
@Override
public String getCollationNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticCollationNameForCollationIndex(collationIndex);
}
@Override
public String getCharsetNameForCollationIndex(Integer collationIndex) {
return CharsetMapping.getStaticMysqlCharsetNameForCollationIndex(collationIndex);
}
@Override
public String getJavaEncodingForCharSet(String charSetName) {
return CharsetMappingMapper.getJavaEncodingForCharSet(charSetName);
}
/**
* Helper to gain access to protected method
*/
private final static class CharsetMappingMapper extends CharsetMapping {
static String getJavaEncodingForCharSet(String charSetName) {
return CharsetMapping.getStaticJavaEncodingForMysqlCharset(charSetName);
}
}
}

View File

@ -5,8 +5,6 @@
*/
package io.debezium.connector.mysql.jdbc;
import java.io.UnsupportedEncodingException;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.relational.Column;
@ -31,7 +29,7 @@ public AbstractFieldReader(MySqlConnectorConfig connectorConfig) {
}
@Override
protected String getCharacterSet(Column column) throws UnsupportedEncodingException {
return MySqlConnection.getJavaEncodingForCharSet(column.charsetName());
protected String getCharacterSet(Column column) {
return getCharsetRegistry().getJavaEncodingForCharSet(column.charsetName());
}
}

View File

@ -11,8 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mysql.cj.CharsetMapping;
import io.debezium.DebeziumException;
import io.debezium.connector.binlog.gtid.GtidSet;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
@ -125,18 +123,4 @@ public GtidSet filterGtidSet(Predicate<String> gtidSourceFilter, String offsetGt
LOGGER.info("Final merged GTID set to use when connecting to MySQL: {}", mergedGtidSet);
return mergedGtidSet;
}
public static String getJavaEncodingForCharSet(String charSetName) {
return CharsetMappingWrapper.getJavaEncodingForMysqlCharSet(charSetName);
}
/**
* Helper to gain access to protected method
*/
private final static class CharsetMappingWrapper extends CharsetMapping {
static String getJavaEncodingForMysqlCharSet(String charSetName) {
return CharsetMapping.getStaticJavaEncodingForMysqlCharset(charSetName);
}
}
}

View File

@ -15,6 +15,7 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -49,19 +50,16 @@ public class MySqlValueConverters extends BinlogValueConverters {
* @param binaryMode how binary columns should be represented
* @param adjuster a temporal adjuster to make a database specific time modification before conversion
* @param eventConvertingFailureHandlingMode how handle when converting failure
* @param charsetRegistry the binlog character set registry
*/
public MySqlValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode);
}
@Override
protected String getJavaEncodingForCharSet(String charSetName) {
return MySqlConnection.getJavaEncodingForCharSet(charSetName);
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry);
}
@Override

View File

@ -12,6 +12,7 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.connector.binlog.BinlogAntlrDdlParserTest;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
@ -54,7 +55,8 @@ protected MySqlValueConverters getValueConverters() {
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinaryHandlingMode.BYTES,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MySqlCharsetRegistry());
}
@Override
@ -86,7 +88,7 @@ public static class MySqlDdlParserWithSimpleTestListener extends MySqlAntlrDdlPa
private MySqlDdlParserWithSimpleTestListener(DdlChanges changesListener, boolean includeViews, boolean includeComments, TableFilter tableFilter,
MySqlValueConverters converters) {
super(false, includeViews, includeComments, converters, tableFilter);
super(false, includeViews, includeComments, converters, tableFilter, new MySqlCharsetRegistry());
this.ddlChanges = changesListener;
}
}

View File

@ -42,7 +42,8 @@ protected MySqlDatabaseSchema getSchema(Configuration config) {
BigIntUnsignedMode.LONG,
BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getCharsetRegistry());
return new MySqlDatabaseSchema(
connectorConfig,

View File

@ -10,6 +10,7 @@
import io.debezium.connector.binlog.BinlogDefaultValueTest;
import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
@ -35,7 +36,8 @@ protected MySqlValueConverters getValueConverter(JdbcValueConverters.DecimalMode
bigIntUnsignedMode,
binaryHandlingMode,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MySqlCharsetRegistry());
}
@Override

View File

@ -12,6 +12,7 @@
import io.debezium.connector.binlog.BinlogValueConvertersTest;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
@ -35,7 +36,8 @@ protected BinlogValueConverters getValueConverters(JdbcValueConverters.DecimalMo
bigIntUnsignedMode,
binaryHandlingMode,
temporalAdjuster,
eventConvertingFailureHandlingMode);
eventConvertingFailureHandlingMode,
new MySqlCharsetRegistry());
}
@Override