DBZ-7783 Use ServiceRegistry rather than CharsetRegistry directly

This commit is contained in:
Chris Cranford 2024-06-18 12:43:20 -04:00 committed by Jiri Pechanec
parent 48c8a9d2db
commit 63a439ef49
16 changed files with 231 additions and 83 deletions

View File

@ -52,6 +52,7 @@
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.ValueConverter;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.time.Year;
import io.debezium.util.Loggings;
import io.debezium.util.Strings;
@ -107,7 +108,7 @@ public abstract class BinlogValueConverters extends JdbcValueConverters {
* @param binaryHandlingMode how binary columns should be treated
* @param adjuster a temporal adjuster to make a database specific time before conversion
* @param eventConvertingFailureHandlingMode how to handle conversion failures
* @param charsetRegistry the character set registry
* @param serviceRegistry the service registry instance, should not be {@code null}
*/
public BinlogValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
@ -115,10 +116,10 @@ public BinlogValueConverters(DecimalMode decimalMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
ServiceRegistry serviceRegistry) {
super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, adjuster, bigIntUnsignedMode, binaryHandlingMode);
this.eventConvertingFailureHandlingMode = eventConvertingFailureHandlingMode;
this.charsetRegistry = charsetRegistry;
this.charsetRegistry = serviceRegistry.getService(BinlogCharsetRegistry.class);
}
@Override

View File

@ -0,0 +1,87 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.binlog.util;
import java.time.temporal.TemporalAdjuster;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogConnectorConfig.BigIntUnsignedHandlingMode;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
/**
* A helper factory for creating binlog value converter implementations for testing.
*
* @author Chris Cranford
*/
public interface BinlogValueConvertersFactory<T extends BinlogValueConverters> {
/**
* Creates an implementation of {@link BinlogValueConverters} based on a given configuration.
*
* @param configuration the connector configuraiton, should never be {@code null}.
* @param temporalAdjuster the temporal adjuster, should never be {@code null}.
*
* @return the constructed value converter instance, never {@code null}
*/
T create(Configuration configuration, TemporalAdjuster temporalAdjuster);
/**
* Creates an implementation of {@link BinlogValueConverters} for the given connector.
*
* @param decimalHandlingMode the decimal handling mode
* @param temporalPrecisionMode the temporal precision mode
* @param bigIntUnsignedHandlingMode the unsigned big integer handling mode
* @param binaryHandlingMode the binary handling mode
* @param temporalAdjuster the time adjuster implementation
* @param eventConvertingFailureHandlingMode the event converting failure handling mode
*
* @return the constructed value converter instance, never {@code null}
*/
default T create(RelationalDatabaseConnectorConfig.DecimalHandlingMode decimalHandlingMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster temporalAdjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
final Configuration configuration = Configuration.create()
.with(BinlogConnectorConfig.DECIMAL_HANDLING_MODE, decimalHandlingMode)
.with(BinlogConnectorConfig.TIME_PRECISION_MODE, temporalPrecisionMode)
.with(BinlogConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE, bigIntUnsignedHandlingMode)
.with(BinlogConnectorConfig.BINARY_HANDLING_MODE, binaryHandlingMode)
.with(BinlogConnectorConfig.EVENT_CONVERTING_FAILURE_HANDLING_MODE, eventConvertingFailureHandlingMode)
.build();
return create(configuration, temporalAdjuster);
}
/**
* Creates an implementation of {@link BinlogValueConverters} for the given connector.
*
* @param decimalHandlingMode the decimal handling mode
* @param temporalPrecisionMode the temporal precision mode
* @param bigIntUnsignedHandlingMode the unsigned big integer handling mode
* @param binaryHandlingMode the binary handling mode
* @param eventConvertingFailureHandlingMode the event converting failure handling mode
*
* @return the constructed value converter instance, never {@code null}
*/
default T create(RelationalDatabaseConnectorConfig.DecimalHandlingMode decimalHandlingMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedHandlingMode bigIntUnsignedHandlingMode,
BinaryHandlingMode binaryHandlingMode,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
return create(decimalHandlingMode,
temporalPrecisionMode,
bigIntUnsignedHandlingMode,
binaryHandlingMode,
x -> x,
eventConvertingFailureHandlingMode);
}
}

View File

@ -23,7 +23,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.BinlogEventMetadataProvider;
import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.binlog.jdbc.BinlogFieldReader;
import io.debezium.connector.mariadb.jdbc.MariaDbConnection;
@ -287,7 +286,7 @@ private MariaDbValueConverters getValueConverters(MariaDbConnectorConfig connect
connectorConfig.binaryHandlingMode(),
connectorConfig.isTimeAdjustedEnabled() ? MariaDbValueConverters::adjustTemporal : x -> x,
connectorConfig.getEventConvertingFailureHandlingMode(),
connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
connectorConfig.getServiceRegistry());
}
private BinlogFieldReader getFieldReader(MariaDbConnectorConfig connectorConfig) {

View File

@ -11,11 +11,11 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.service.spi.ServiceRegistry;
/**
* MariaDB specific converter handlers for JDBC values.<p></p>
@ -44,7 +44,7 @@ public class MariaDbValueConverters extends BinlogValueConverters {
* @param binaryHandlingMode how binary columns should be treated
* @param adjuster a temporal adjuster to make a database specific time before conversion
* @param eventConvertingFailureHandlingMode how to handle conversion failures
* @param charsetRegistry the character set registry
* @param serviceRegistry the service registry, should not be {@code null}
*/
public MariaDbValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
@ -52,8 +52,8 @@ public MariaDbValueConverters(DecimalMode decimalMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry);
ServiceRegistry serviceRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryHandlingMode, adjuster, eventConvertingFailureHandlingMode, serviceRegistry);
}
@Override

View File

@ -11,11 +11,12 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchemaTest;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;
@ -34,19 +35,15 @@ protected MariaDbConnectorConfig getConnectorConfig(Configuration config) {
@Override
protected MariaDbDatabaseSchema getSchema(Configuration config) {
this.connectorConfig = getConnectorConfig(config);
final MariaDbValueConverters valueConverters = new MariaDbValueConverters(
JdbcValueConverters.DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
JdbcValueConverters.BigIntUnsignedMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
MariaDbValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
return new MariaDbDatabaseSchema(
connectorConfig,
valueConverters,
new MariaDbValueConvertersFactory().create(
DecimalHandlingMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.LONG,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
MariaDbValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN),
(TopicNamingStrategy) DefaultTopicNamingStrategy.create(connectorConfig),
SchemaNameAdjuster.create(),
false);

View File

@ -7,15 +7,17 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDefaultValueTest;
import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
/**
* @author Chris Cranford
@ -31,14 +33,12 @@ protected MariaDbValueConverters getValueConverter(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryHandlingMode) {
return new MariaDbValueConverters(
decimalMode,
return new MariaDbValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()),
temporalPrecisionMode,
bigIntUnsignedMode,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()),
binaryHandlingMode,
x -> x,
EventConvertingFailureHandlingMode.WARN,
new MariaDbCharsetRegistry());
EventConvertingFailureHandlingMode.WARN);
}
@Override

View File

@ -9,12 +9,14 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.binlog.BinlogAntlrDdlParserTest;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbDefaultValueConverter;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.SimpleDdlParserListener;
@ -45,14 +47,12 @@ protected MariaDbAntlrDdlParser getParser(SimpleDdlParserListener listener, Mari
@Override
protected MariaDbValueConverters getValueConverters() {
return new MariaDbValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
return new MariaDbValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE,
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.PRECISE,
CommonConnectorConfig.BinaryHandlingMode.BYTES,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MariaDbCharsetRegistry());
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN);
}
@Override

View File

@ -9,14 +9,15 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogValueConvertersTest;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mariadb.antlr.MariaDbAntlrDdlParser;
import io.debezium.connector.mariadb.charset.MariaDbCharsetRegistry;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
import io.debezium.connector.mariadb.util.MariaDbValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.ddl.DdlParser;
/**
@ -30,14 +31,13 @@ protected BinlogValueConverters getValueConverters(DecimalMode decimalMode,
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster temporalAdjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
return new MariaDbValueConverters(
decimalMode,
return new MariaDbValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()),
temporalPrecisionMode,
bigIntUnsignedMode,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()),
binaryHandlingMode,
temporalAdjuster,
eventConvertingFailureHandlingMode,
new MariaDbCharsetRegistry());
eventConvertingFailureHandlingMode);
}
@Override

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mariadb.util;
import java.time.temporal.TemporalAdjuster;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.util.BinlogValueConvertersFactory;
import io.debezium.connector.mariadb.MariaDbConnectorConfig;
import io.debezium.connector.mariadb.jdbc.MariaDbValueConverters;
/**
* Implementation of {@link BinlogValueConvertersFactory} for MariaDB.
*
* @author Chris Cranford
*/
public class MariaDbValueConvertersFactory implements BinlogValueConvertersFactory<MariaDbValueConverters> {
@Override
public MariaDbValueConverters create(Configuration configuration, TemporalAdjuster temporalAdjuster) {
final MariaDbConnectorConfig connectorConfig = new MariaDbConnectorConfig(configuration);
return new MariaDbValueConverters(
connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(),
connectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(),
connectorConfig.binaryHandlingMode(),
temporalAdjuster,
connectorConfig.getEventConvertingFailureHandlingMode(),
connectorConfig.getServiceRegistry());
}
}

View File

@ -21,7 +21,6 @@
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.binlog.BinlogEventMetadataProvider;
import io.debezium.connector.binlog.BinlogSourceTask;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogConnectorConnection;
import io.debezium.connector.mysql.jdbc.MySqlConnection;
import io.debezium.connector.mysql.jdbc.MySqlConnectionConfiguration;
@ -235,7 +234,7 @@ private MySqlValueConverters getValueConverters(MySqlConnectorConfig configurati
configuration.binaryHandlingMode(),
configuration.isTimeAdjustedEnabled() ? MySqlValueConverters::adjustTemporal : x -> x,
configuration.getEventConvertingFailureHandlingMode(),
configuration.getServiceRegistry().getService(BinlogCharsetRegistry.class));
configuration.getServiceRegistry());
}
@Override

View File

@ -15,11 +15,11 @@
import io.debezium.annotation.Immutable;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
import io.debezium.service.spi.ServiceRegistry;
/**
* MySQL-specific customization of the conversions from JDBC values obtained from the MySQL binlog client library.
@ -50,7 +50,7 @@ public class MySqlValueConverters extends BinlogValueConverters {
* @param binaryMode how binary columns should be represented
* @param adjuster a temporal adjuster to make a database specific time modification before conversion
* @param eventConvertingFailureHandlingMode how handle when converting failure
* @param charsetRegistry the binlog character set registry
* @param serviceRegistry the service registry, should not be {@code null}
*/
public MySqlValueConverters(DecimalMode decimalMode,
TemporalPrecisionMode temporalPrecisionMode,
@ -58,8 +58,8 @@ public MySqlValueConverters(DecimalMode decimalMode,
BinaryHandlingMode binaryMode,
TemporalAdjuster adjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode,
BinlogCharsetRegistry charsetRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode, charsetRegistry);
ServiceRegistry serviceRegistry) {
super(decimalMode, temporalPrecisionMode, bigIntUnsignedMode, binaryMode, adjuster, eventConvertingFailureHandlingMode, serviceRegistry);
}
@Override

View File

@ -8,15 +8,18 @@
import java.util.List;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogAntlrDdlParserTest;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.connector.mysql.util.MySqlValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.relational.ddl.DdlChanges;
import io.debezium.relational.ddl.SimpleDdlParserListener;
@ -49,14 +52,12 @@ protected MySqlAntlrDdlParser getParser(SimpleDdlParserListener listener, MySqlV
@Override
protected MySqlValueConverters getValueConverters() {
return new MySqlValueConverters(
JdbcValueConverters.DecimalMode.DOUBLE,
return new MySqlValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(JdbcValueConverters.DecimalMode.DOUBLE.name()),
TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS,
JdbcValueConverters.BigIntUnsignedMode.PRECISE,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(JdbcValueConverters.BigIntUnsignedMode.PRECISE.name()),
BinaryHandlingMode.BYTES,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MySqlCharsetRegistry());
EventConvertingFailureHandlingMode.WARN);
}
@Override

View File

@ -12,12 +12,14 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDatabaseSchemaTest;
import io.debezium.connector.binlog.charset.BinlogCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.connector.mysql.util.MySqlValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters.BigIntUnsignedMode;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;
@ -36,19 +38,15 @@ protected MySqlConnectorConfig getConnectorConfig(Configuration config) {
@Override
protected MySqlDatabaseSchema getSchema(Configuration config) {
this.connectorConfig = getConnectorConfig(config);
final MySqlValueConverters mySqlValueConverters = new MySqlValueConverters(
DecimalMode.PRECISE,
TemporalPrecisionMode.ADAPTIVE,
BigIntUnsignedMode.LONG,
BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
connectorConfig.getServiceRegistry().getService(BinlogCharsetRegistry.class));
return new MySqlDatabaseSchema(
connectorConfig,
mySqlValueConverters,
new MySqlValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(DecimalMode.PRECISE.name()),
TemporalPrecisionMode.ADAPTIVE,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(BigIntUnsignedMode.LONG.name()),
BinaryHandlingMode.BYTES,
MySqlValueConverters::adjustTemporal,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN),
(TopicNamingStrategy) DefaultTopicNamingStrategy.create(connectorConfig),
SchemaNameAdjuster.create(),
false);

View File

@ -5,16 +5,18 @@
*/
package io.debezium.connector.mysql;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogDefaultValueTest;
import io.debezium.connector.binlog.jdbc.BinlogDefaultValueConverter;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlDefaultValueConverter;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.connector.mysql.util.MySqlValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
/**
* @author laomei
@ -30,14 +32,12 @@ protected MySqlValueConverters getValueConverter(JdbcValueConverters.DecimalMode
TemporalPrecisionMode temporalPrecisionMode,
JdbcValueConverters.BigIntUnsignedMode bigIntUnsignedMode,
BinaryHandlingMode binaryHandlingMode) {
return new MySqlValueConverters(
decimalMode,
return new MySqlValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()),
temporalPrecisionMode,
bigIntUnsignedMode,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()),
binaryHandlingMode,
x -> x,
CommonConnectorConfig.EventConvertingFailureHandlingMode.WARN,
new MySqlCharsetRegistry());
EventConvertingFailureHandlingMode.WARN);
}
@Override

View File

@ -9,13 +9,14 @@
import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.config.CommonConnectorConfig.EventConvertingFailureHandlingMode;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogValueConvertersTest;
import io.debezium.connector.binlog.jdbc.BinlogValueConverters;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.connector.mysql.charset.MySqlCharsetRegistry;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
import io.debezium.connector.mysql.util.MySqlValueConvertersFactory;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.ddl.DdlParser;
/**
@ -30,14 +31,13 @@ protected BinlogValueConverters getValueConverters(JdbcValueConverters.DecimalMo
BinaryHandlingMode binaryHandlingMode,
TemporalAdjuster temporalAdjuster,
EventConvertingFailureHandlingMode eventConvertingFailureHandlingMode) {
return new MySqlValueConverters(
decimalMode,
return new MySqlValueConvertersFactory().create(
RelationalDatabaseConnectorConfig.DecimalHandlingMode.parse(decimalMode.name()),
temporalPrecisionMode,
bigIntUnsignedMode,
BinlogConnectorConfig.BigIntUnsignedHandlingMode.parse(bigIntUnsignedMode.name()),
binaryHandlingMode,
temporalAdjuster,
eventConvertingFailureHandlingMode,
new MySqlCharsetRegistry());
eventConvertingFailureHandlingMode);
}
@Override

View File

@ -0,0 +1,33 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql.util;
import java.time.temporal.TemporalAdjuster;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.util.BinlogValueConvertersFactory;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.jdbc.MySqlValueConverters;
/**
* Implementation of {@link BinlogValueConvertersFactory} for MySQL.
*
* @author Chris Cranford
*/
public class MySqlValueConvertersFactory implements BinlogValueConvertersFactory<MySqlValueConverters> {
@Override
public MySqlValueConverters create(Configuration configuration, TemporalAdjuster temporalAdjuster) {
final MySqlConnectorConfig connectorConfig = new MySqlConnectorConfig(configuration);
return new MySqlValueConverters(
connectorConfig.getDecimalMode(),
connectorConfig.getTemporalPrecisionMode(),
connectorConfig.getBigIntUnsignedHandlingMode().asBigIntUnsignedMode(),
connectorConfig.binaryHandlingMode(),
temporalAdjuster,
connectorConfig.getEventConvertingFailureHandlingMode(),
connectorConfig.getServiceRegistry());
}
}