From 63a439ef49f01bec3097ad21d9f2967e7186c73b Mon Sep 17 00:00:00 2001 From: Chris Cranford Date: Tue, 18 Jun 2024 12:43:20 -0400 Subject: [PATCH] DBZ-7783 Use ServiceRegistry rather than CharsetRegistry directly --- .../binlog/jdbc/BinlogValueConverters.java | 7 +- .../util/BinlogValueConvertersFactory.java | 87 +++++++++++++++++++ .../mariadb/MariaDbConnectorTask.java | 3 +- .../mariadb/jdbc/MariaDbValueConverters.java | 8 +- .../connector/mariadb/DatabaseSchemaTest.java | 23 +++-- .../connector/mariadb/DefaultValueTest.java | 14 +-- .../mariadb/MariaDbAntlrDdlParserTest.java | 14 +-- .../mariadb/ValueConvertersTest.java | 14 +-- .../util/MariaDbValueConvertersFactory.java | 33 +++++++ .../connector/mysql/MySqlConnectorTask.java | 3 +- .../mysql/jdbc/MySqlValueConverters.java | 8 +- .../mysql/MySqlAntlrDdlParserTest.java | 15 ++-- .../mysql/MySqlDatabaseSchemaTest.java | 22 +++-- .../mysql/MySqlDefaultValueTest.java | 16 ++-- .../mysql/MySqlValueConvertersTest.java | 14 +-- .../util/MySqlValueConvertersFactory.java | 33 +++++++ 16 files changed, 231 insertions(+), 83 deletions(-) create mode 100644 debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogValueConvertersFactory.java create mode 100644 debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/util/MariaDbValueConvertersFactory.java create mode 100644 debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/util/MySqlValueConvertersFactory.java diff --git a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogValueConverters.java b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogValueConverters.java index a5fbad997..35eb549b2 100644 --- a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogValueConverters.java +++ b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/jdbc/BinlogValueConverters.java @@ -52,6 +52,7 @@ import io.debezium.relational.Column; import io.debezium.relational.Table; import io.debezium.relational.ValueConverter; +import io.debezium.service.spi.ServiceRegistry; import io.debezium.time.Year; import io.debezium.util.Loggings; import io.debezium.util.Strings; @@ -107,7 +108,7 @@ public abstract class BinlogValueConverters extends JdbcValueConverters { * @param binaryHandlingMode how binary columns should be treated * @param adjuster a temporal adjuster to make a database specific time before conversion * @param eventConvertingFailureHandlingMode how to handle conversion failures - * @param charsetRegistry the character set registry + * @param serviceRegistry the service registry instance, should not be {@code null} */ public BinlogValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, @@ -115,10 +116,10 @@ public BinlogValueConverters(DecimalMode decimalMode, BinaryHandlingMode binaryHandlingMode, TemporalAdjuster adjuster, EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode, - BinlogCharsetRegistry charsetRegistry) { + ServiceRegistry serviceRegistry) { super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, adjuster, bigIntUnsignedMode, binaryHandlingMode); this.eventConvertingFailureHandlingMode = eventConvertingFailureHandlingMode; - this.charsetRegistry = charsetRegistry; + this.charsetRegistry = serviceRegistry.getService(BinlogCharsetRegistry.class); } @Override diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogValueConvertersFactory.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogValueConvertersFactory.java new file mode 100644 index 000000000..bb9684560 --- /dev/null +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/util/BinlogValueConvertersFactory.java @@ -0,0 +1,87 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.binlog.util; + +import java.time.temporal.TemporalAdjuster; + +import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; +import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; +import io.debezium.config.Configuration; +import io.debezium.connector.binlog.BinlogConnectorConfig; +import io.debezium.connector.binlog.BinlogConnectorConfig.BigIntUnsignedHandlingMode; +import io.debezium.connector.binlog.jdbc.BinlogValueConverters; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; + +/** + * A helper factory for creating binlog value converter implementations for testing. + * + * @author Chris Cranford + */ +public interface BinlogValueConvertersFactory { + /** + * Creates an implementation of {@link BinlogValueConverters} based on a given configuration. + * + * @param configuration the connector configuraiton, should never be {@code null}. + * @param temporalAdjuster the temporal adjuster, should never be {@code null}. + * + * @return the constructed value converter instance, never {@code null} + */ + T create(Configuration configuration, TemporalAdjuster temporalAdjuster); + + /** + * Creates an implementation of {@link BinlogValueConverters} for the given connector. + * + * @param decimalHandlingMode the decimal handling mode + * @param temporalPrecisionMode the temporal precision mode + * @param bigIntUnsignedHandlingMode the unsigned big integer handling mode + * @param binaryHandlingMode the binary handling mode + * @param temporalAdjuster the time adjuster implementation + * @param eventConvertingFailureHandlingMode the event converting failure handling mode + * + * @return the constructed value converter instance, never {@code null} + */ + default T create(RelationalDatabaseConnectorConfig.DecimalHandlingMode decimalHandlingMode, + TemporalPrecisionMode temporalPrecisionMode, + BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode, + BinaryHandlingMode binaryHandlingMode, + TemporalAdjuster temporalAdjuster, + EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) { + final Configuration configuration = Configuration.create() + .with(BinlogConnectorConfig.DECIMAL_HANDLING_MODE, decimalHandlingMode) + .with(BinlogConnectorConfig.TIME_PRECISION_MODE, temporalPrecisionMode) + .with(BinlogConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE, bigIntUnsignedHandlingMode) + .with(BinlogConnectorConfig.BINARY_HANDLING_MODE, binaryHandlingMode) + .with(BinlogConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, eventConvertingFailureHandlingMode) + .build(); + + return create(configuration, temporalAdjuster); + } + + /** + * Creates an implementation of {@link BinlogValueConverters} for the given connector. + * + * @param decimalHandlingMode the decimal handling mode + * @param temporalPrecisionMode the temporal precision mode + * @param bigIntUnsignedHandlingMode the unsigned big integer handling mode + * @param binaryHandlingMode the binary handling mode + * @param eventConvertingFailureHandlingMode the event converting failure handling mode + * + * @return the constructed value converter instance, never {@code null} + */ + default T create(RelationalDatabaseConnectorConfig.DecimalHandlingMode decimalHandlingMode, + TemporalPrecisionMode temporalPrecisionMode, + BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode, + BinaryHandlingMode binaryHandlingMode, + EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) { + return create(decimalHandlingMode, + temporalPrecisionMode, + bigIntUnsignedHandlingMode, + binaryHandlingMode, + x -> x, + eventConvertingFailureHandlingMode); + } +} diff --git a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbConnectorTask.java b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbConnectorTask.java index b41ab6dac..8ebc04b21 100644 --- a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbConnectorTask.java +++ b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/MariaDbConnectorTask.java @@ -23,7 +23,6 @@ import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.binlog.BinlogEventMetadataProvider; import io.debezium.connector.binlog.BinlogSourceTask; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.connector.binlog.jdbc.BinlogFieldReader; import io.debezium.connector.mariadb.jdbc.MariaDbConnection; @@ -287,7 +286,7 @@ private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig connect connectorConfig.binaryHandlingMode(), connectorConfig.isTimeAdjustedEnabled() ? MariaDbValueConverters::adjustTemporal : x -> x, connectorConfig.getEventConvertingFailureHandlingMode(), - connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class)); + connectorConfig.getServiceRegistry()); } private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig) { diff --git a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/jdbc/MariaDbValueConverters.java b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/jdbc/MariaDbValueConverters.java index 970707ff1..6dba608e6 100644 --- a/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/jdbc/MariaDbValueConverters.java +++ b/debezium-connector-mariadb/src/main/java/io/debezium/connector/mariadb/jdbc/MariaDbValueConverters.java @@ -11,11 +11,11 @@ import io.debezium.annotation.Immutable; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.binlog.jdbc.BinlogValueConverters; import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; +import io.debezium.service.spi.ServiceRegistry; /** * MariaDB specific converter handlers for JDBC values.

@@ -44,7 +44,7 @@ public class MariaDbValueConverters extends BinlogValueConverters { * @param binaryHandlingMode how binary columns should be treated * @param adjuster a temporal adjuster to make a database specific time before conversion * @param eventConvertingFailureHandlingMode how to handle conversion failures - * @param charsetRegistry the character set registry + * @param serviceRegistry the service registry, should not be {@code null} */ public MariaDbValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, @@ -52,8 +52,8 @@ public MariaDbValueConverters(DecimalMode decimalMode, BinaryHandlingMode binaryHandlingMode, TemporalAdjuster adjuster, EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode, - BinlogCharsetRegistry charsetRegistry) { - super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry); + ServiceRegistry serviceRegistry) { + super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode, serviceRegistry); } @Override diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DatabaseSchemaTest.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DatabaseSchemaTest.java index 9893eeb07..0fb9df890 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DatabaseSchemaTest.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DatabaseSchemaTest.java @@ -11,11 +11,12 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogDatabaseSchemaTest; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; -import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaNameAdjuster; @@ -34,19 +35,15 @@ protected MariaDbConnectorConfig getConnectorConfig(Configuration config) { @Override protected MariaDbDatabaseSchema getSchema(Configuration config) { this.connectorConfig = getConnectorConfig(config); - - final MariaDbValueConverters valueConverters = new MariaDbValueConverters( - JdbcValueConverters.DecimalMode.PRECISE, - TemporalPrecisionMode.ADAPTIVE, - JdbcValueConverters.BigIntUnsignedMode.LONG, - CommonConnectorConfig.BinaryHandlingMode.BYTES, - MariaDbValueConverters::adjustTemporal, - CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, - connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class)); - return new MariaDbDatabaseSchema( connectorConfig, - valueConverters, + new MariaDbValueConvertersFactory().create( + DecimalHandlingMode.PRECISE, + TemporalPrecisionMode.ADAPTIVE, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.LONG, + CommonConnectorConfig.BinaryHandlingMode.BYTES, + MariaDbValueConverters::adjustTemporal, + CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(connectorConfig), SchemaNameAdjuster.create(), false); diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DefaultValueTest.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DefaultValueTest.java index afb726110..37b2e0024 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DefaultValueTest.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/DefaultValueTest.java @@ -7,15 +7,17 @@ import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogDefaultValueTest; import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter; import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser; -import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry; import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter; import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; +import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; /** * @author Chris Cranford @@ -31,14 +33,12 @@ protected MariaDbValueConverters getValueConverter(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, BigIntUnsignedMode bigIntUnsignedMode, BinaryHandlingMode binaryHandlingMode) { - return new MariaDbValueConverters( - decimalMode, + return new MariaDbValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()), temporalPrecisionMode, - bigIntUnsignedMode, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()), binaryHandlingMode, - x -> x, - EventConvertingFailureHandlingMode.WARN, - new MariaDbCharsetRegistry()); + EventConvertingFailureHandlingMode.WARN); } @Override diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbAntlrDdlParserTest.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbAntlrDdlParserTest.java index 7e10687c3..425d7c55e 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbAntlrDdlParserTest.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbAntlrDdlParserTest.java @@ -9,12 +9,14 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.binlog.BinlogAntlrDdlParserTest; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser; import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry; import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter; import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; -import io.debezium.jdbc.JdbcValueConverters; +import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Tables; import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.SimpleDdlParserListener; @@ -45,14 +47,12 @@ protected MariaDbAntlrDdlParser getParser(SimpleDdlParserListener listener, Mari @Override protected MariaDbValueConverters getValueConverters() { - return new MariaDbValueConverters( - JdbcValueConverters.DecimalMode.DOUBLE, + return new MariaDbValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, - JdbcValueConverters.BigIntUnsignedMode.PRECISE, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.PRECISE, CommonConnectorConfig.BinaryHandlingMode.BYTES, - x -> x, - CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, - new MariaDbCharsetRegistry()); + CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN); } @Override diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ValueConvertersTest.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ValueConvertersTest.java index 50e4260ec..72654221f 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ValueConvertersTest.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ValueConvertersTest.java @@ -9,14 +9,15 @@ import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogValueConvertersTest; import io.debezium.connector.binlog.jdbc.BinlogValueConverters; import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser; -import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry; -import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; +import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.ddl.DdlParser; /** @@ -30,14 +31,13 @@ protected BinlogValueConverters getValueConverters(DecimalMode decimalMode, BinaryHandlingMode binaryHandlingMode, TemporalAdjuster temporalAdjuster, EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) { - return new MariaDbValueConverters( - decimalMode, + return new MariaDbValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()), temporalPrecisionMode, - bigIntUnsignedMode, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()), binaryHandlingMode, temporalAdjuster, - eventConvertingFailureHandlingMode, - new MariaDbCharsetRegistry()); + eventConvertingFailureHandlingMode); } @Override diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/util/MariaDbValueConvertersFactory.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/util/MariaDbValueConvertersFactory.java new file mode 100644 index 000000000..7080c4d6c --- /dev/null +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/util/MariaDbValueConvertersFactory.java @@ -0,0 +1,33 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mariadb.util; + +import java.time.temporal.TemporalAdjuster; + +import io.debezium.config.Configuration; +import io.debezium.connector.binlog.util.BinlogValueConvertersFactory; +import io.debezium.connector.mariadb.MariaDbConnectorConfig; +import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters; + +/** + * Implementation of {@link BinlogValueConvertersFactory} for MariaDB. + * + * @author Chris Cranford + */ +public class MariaDbValueConvertersFactory implements BinlogValueConvertersFactory { + @Override + public MariaDbValueConverters create(Configuration configuration, TemporalAdjuster temporalAdjuster) { + final MariaDbConnectorConfig connectorConfig = new MariaDbConnectorConfig(configuration); + return new MariaDbValueConverters( + connectorConfig.getDecimalMode(), + connectorConfig.getTemporalPrecisionMode(), + connectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(), + connectorConfig.binaryHandlingMode(), + temporalAdjuster, + connectorConfig.getEventConvertingFailureHandlingMode(), + connectorConfig.getServiceRegistry()); + } +} diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index b4921d551..9733b3a7b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -21,7 +21,6 @@ import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.binlog.BinlogEventMetadataProvider; import io.debezium.connector.binlog.BinlogSourceTask; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection; import io.debezium.connector.mysql.jdbc.MySqlConnection; import io.debezium.connector.mysql.jdbc.MySqlConnectionConfiguration; @@ -235,7 +234,7 @@ private MySqlValueConverters getValueConverters(MySqlConnectorConfig configurati configuration.binaryHandlingMode(), configuration.isTimeAdjustedEnabled() ? MySqlValueConverters::adjustTemporal : x -> x, configuration.getEventConvertingFailureHandlingMode(), - configuration.getServiceRegistry().getService(BinlogCharsetRegistry.class)); + configuration.getServiceRegistry()); } @Override diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlValueConverters.java index 0086e6172..9ab767a3e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/jdbc/MySqlValueConverters.java @@ -15,11 +15,11 @@ import io.debezium.annotation.Immutable; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.binlog.jdbc.BinlogValueConverters; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.jdbc.TemporalPrecisionMode; import io.debezium.relational.Column; +import io.debezium.service.spi.ServiceRegistry; /** * MySQL-specific customization of the conversions from JDBC values obtained from the MySQL binlog client library. @@ -50,7 +50,7 @@ public class MySqlValueConverters extends BinlogValueConverters { * @param binaryMode how binary columns should be represented * @param adjuster a temporal adjuster to make a database specific time modification before conversion * @param eventConvertingFailureHandlingMode how handle when converting failure - * @param charsetRegistry the binlog character set registry + * @param serviceRegistry the service registry, should not be {@code null} */ public MySqlValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, @@ -58,8 +58,8 @@ public MySqlValueConverters(DecimalMode decimalMode, BinaryHandlingMode binaryMode, TemporalAdjuster adjuster, EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode, - BinlogCharsetRegistry charsetRegistry) { - super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry); + ServiceRegistry serviceRegistry) { + super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode, serviceRegistry); } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java index 1630172de..32857dbb0 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlAntlrDdlParserTest.java @@ -8,15 +8,18 @@ import java.util.List; -import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; +import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; import io.debezium.connector.binlog.BinlogAntlrDdlParserTest; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; import io.debezium.connector.mysql.charset.MySqlCharsetRegistry; import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter; import io.debezium.connector.mysql.jdbc.MySqlValueConverters; +import io.debezium.connector.mysql.util.MySqlValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.Tables.TableFilter; import io.debezium.relational.ddl.DdlChanges; import io.debezium.relational.ddl.SimpleDdlParserListener; @@ -49,14 +52,12 @@ protected MySqlAntlrDdlParser getParser(SimpleDdlParserListener listener, MySqlV @Override protected MySqlValueConverters getValueConverters() { - return new MySqlValueConverters( - JdbcValueConverters.DecimalMode.DOUBLE, + return new MySqlValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(JdbcValueConverters.DecimalMode.DOUBLE.name()), TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS, - JdbcValueConverters.BigIntUnsignedMode.PRECISE, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(JdbcValueConverters.BigIntUnsignedMode.PRECISE.name()), BinaryHandlingMode.BYTES, - x -> x, - CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, - new MySqlCharsetRegistry()); + EventConvertingFailureHandlingMode.WARN); } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDatabaseSchemaTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDatabaseSchemaTest.java index e0822e530..733d1ec8c 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDatabaseSchemaTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDatabaseSchemaTest.java @@ -12,12 +12,14 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.Configuration; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogDatabaseSchemaTest; -import io.debezium.connector.binlog.charset.BinlogCharsetRegistry; import io.debezium.connector.mysql.jdbc.MySqlValueConverters; +import io.debezium.connector.mysql.util.MySqlValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode; import io.debezium.jdbc.JdbcValueConverters.DecimalMode; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.history.AbstractSchemaHistory; import io.debezium.schema.DefaultTopicNamingStrategy; import io.debezium.schema.SchemaNameAdjuster; @@ -36,19 +38,15 @@ protected MySqlConnectorConfig getConnectorConfig(Configuration config) { @Override protected MySqlDatabaseSchema getSchema(Configuration config) { this.connectorConfig = getConnectorConfig(config); - - final MySqlValueConverters mySqlValueConverters = new MySqlValueConverters( - DecimalMode.PRECISE, - TemporalPrecisionMode.ADAPTIVE, - BigIntUnsignedMode.LONG, - BinaryHandlingMode.BYTES, - MySqlValueConverters::adjustTemporal, - CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, - connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class)); - return new MySqlDatabaseSchema( connectorConfig, - mySqlValueConverters, + new MySqlValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(DecimalMode.PRECISE.name()), + TemporalPrecisionMode.ADAPTIVE, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(BigIntUnsignedMode.LONG.name()), + BinaryHandlingMode.BYTES, + MySqlValueConverters::adjustTemporal, + CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(connectorConfig), SchemaNameAdjuster.create(), false); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDefaultValueTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDefaultValueTest.java index a55c37b52..c06871508 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDefaultValueTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlDefaultValueTest.java @@ -5,16 +5,18 @@ */ package io.debezium.connector.mysql; -import io.debezium.config.CommonConnectorConfig; import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; +import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogDefaultValueTest; import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; -import io.debezium.connector.mysql.charset.MySqlCharsetRegistry; import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter; import io.debezium.connector.mysql.jdbc.MySqlValueConverters; +import io.debezium.connector.mysql.util.MySqlValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; /** * @author laomei @@ -30,14 +32,12 @@ protected MySqlValueConverters getValueConverter(JdbcValueConverters.DecimalMode TemporalPrecisionMode temporalPrecisionMode, JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode, BinaryHandlingMode binaryHandlingMode) { - return new MySqlValueConverters( - decimalMode, + return new MySqlValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()), temporalPrecisionMode, - bigIntUnsignedMode, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()), binaryHandlingMode, - x -> x, - CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN, - new MySqlCharsetRegistry()); + EventConvertingFailureHandlingMode.WARN); } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlValueConvertersTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlValueConvertersTest.java index 137b29b13..c286cd291 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlValueConvertersTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlValueConvertersTest.java @@ -9,13 +9,14 @@ import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode; import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode; +import io.debezium.connector.binlog.BinlogConnectorConfig; import io.debezium.connector.binlog.BinlogValueConvertersTest; import io.debezium.connector.binlog.jdbc.BinlogValueConverters; import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser; -import io.debezium.connector.mysql.charset.MySqlCharsetRegistry; -import io.debezium.connector.mysql.jdbc.MySqlValueConverters; +import io.debezium.connector.mysql.util.MySqlValueConvertersFactory; import io.debezium.jdbc.JdbcValueConverters; import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.ddl.DdlParser; /** @@ -30,14 +31,13 @@ protected BinlogValueConverters getValueConverters(JdbcValueConverters.DecimalMo BinaryHandlingMode binaryHandlingMode, TemporalAdjuster temporalAdjuster, EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) { - return new MySqlValueConverters( - decimalMode, + return new MySqlValueConvertersFactory().create( + RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()), temporalPrecisionMode, - bigIntUnsignedMode, + BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()), binaryHandlingMode, temporalAdjuster, - eventConvertingFailureHandlingMode, - new MySqlCharsetRegistry()); + eventConvertingFailureHandlingMode); } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/util/MySqlValueConvertersFactory.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/util/MySqlValueConvertersFactory.java new file mode 100644 index 000000000..cfdd2c4a4 --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/util/MySqlValueConvertersFactory.java @@ -0,0 +1,33 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql.util; + +import java.time.temporal.TemporalAdjuster; + +import io.debezium.config.Configuration; +import io.debezium.connector.binlog.util.BinlogValueConvertersFactory; +import io.debezium.connector.mysql.MySqlConnectorConfig; +import io.debezium.connector.mysql.jdbc.MySqlValueConverters; + +/** + * Implementation of {@link BinlogValueConvertersFactory} for MySQL. + * + * @author Chris Cranford + */ +public class MySqlValueConvertersFactory implements BinlogValueConvertersFactory { + @Override + public MySqlValueConverters create(Configuration configuration, TemporalAdjuster temporalAdjuster) { + final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(configuration); + return new MySqlValueConverters( + connectorConfig.getDecimalMode(), + connectorConfig.getTemporalPrecisionMode(), + connectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(), + connectorConfig.binaryHandlingMode(), + temporalAdjuster, + connectorConfig.getEventConvertingFailureHandlingMode(), + connectorConfig.getServiceRegistry()); + } +}