diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractMysqlFieldReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractMysqlFieldReader.java index 26c18d4b0..47b91002e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractMysqlFieldReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractMysqlFieldReader.java @@ -5,15 +5,18 @@ */ package io.debezium.connector.mysql; +import java.io.UnsupportedEncodingException; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.debezium.relational.Column; import io.debezium.relational.Table; +import io.debezium.util.Collect; /** * Abstract class for decode MySQL return value according to different protocols. @@ -24,6 +27,14 @@ public abstract class AbstractMysqlFieldReader implements MysqlFieldReader { protected final Logger logger = LoggerFactory.getLogger(getClass()); + private static final Set TEXT_DATATYPES = Collect.unmodifiableSet("CHAR", "VARCHAR", "TEXT"); + + private final MySqlConnectorConfig connectorConfig; + + protected AbstractMysqlFieldReader(MySqlConnectorConfig connectorConfig) { + this.connectorConfig = connectorConfig; + } + @Override public Object readField(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException { if (column.jdbcType() == Types.TIME) { @@ -49,10 +60,14 @@ else if (column.jdbcType() == Types.TINYINT || column.jdbcType() == Types.SMALLI // DBZ-2673 // It is necessary to check the type names as types like ENUM and SET are // also reported as JDBC type char - else if ("CHAR".equals(column.typeName()) || - "VARCHAR".equals(column.typeName()) || - "TEXT".equals(column.typeName())) { - return rs.getBytes(columnIndex); + else if (!connectorConfig.customConverterRegistry().isEmpty() && TEXT_DATATYPES.contains(column.typeName())) { + try { + return rs.getString(columnIndex).getBytes(column.charsetName()); + } + catch (UnsupportedEncodingException e) { + logger.warn("Unsupported encoding '{}' for column '{}', sending value as String"); + return rs.getObject(columnIndex); + } } else { return rs.getObject(columnIndex); diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java index ee882c832..5c950a656 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -75,7 +75,7 @@ public MySqlConnection(MySqlConnectionConfiguration connectionConfig, MysqlField * @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null. */ public MySqlConnection(MySqlConnectionConfiguration connectionConfig) { - this(connectionConfig, new MysqlTextProtocolFieldReader()); + this(connectionConfig, new MysqlTextProtocolFieldReader(null)); } @Override diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index a78a13d4e..550be2b14 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -80,8 +80,8 @@ public ChangeEventSourceCoordinator start(Co .build(); connection = new MySqlConnection(new MySqlConnectionConfiguration(config), - connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() - : new MysqlTextProtocolFieldReader()); + connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader(connectorConfig) + : new MysqlTextProtocolFieldReader(connectorConfig)); validateBinlogConfiguration(connectorConfig); @@ -140,8 +140,8 @@ public ChangeEventSourceCoordinator start(Co Heartbeat heartbeat = null; if (!connectorConfig.getHeartbeatActionQuery().isEmpty()) { heartbeatConnection = new MySqlConnection(new MySqlConnectionConfiguration(config), - connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() - : new MysqlTextProtocolFieldReader()); + connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader(connectorConfig) + : new MysqlTextProtocolFieldReader(connectorConfig)); heartbeat = Heartbeat.create( connectorConfig.getHeartbeatInterval(), diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlBinaryProtocolFieldReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlBinaryProtocolFieldReader.java index b2bf1ec22..d8f8b74da 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlBinaryProtocolFieldReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlBinaryProtocolFieldReader.java @@ -29,6 +29,10 @@ public class MysqlBinaryProtocolFieldReader extends AbstractMysqlFieldReader { private static final Logger LOGGER = LoggerFactory.getLogger(MysqlBinaryProtocolFieldReader.class); + public MysqlBinaryProtocolFieldReader(MySqlConnectorConfig config) { + super(config); + } + /** * @see ProtocolBinary::MYSQL_TYPE_TIME */ diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlTextProtocolFieldReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlTextProtocolFieldReader.java index 69fe07f3c..696c635d9 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlTextProtocolFieldReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MysqlTextProtocolFieldReader.java @@ -26,6 +26,10 @@ public class MysqlTextProtocolFieldReader extends AbstractMysqlFieldReader { private static final Logger LOGGER = LoggerFactory.getLogger(MysqlTextProtocolFieldReader.class); + public MysqlTextProtocolFieldReader(MySqlConnectorConfig config) { + super(config); + } + /** * As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a binary-ish workaround * diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/SnapshotReader.java index a0a84fe7f..ce784af57 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/legacy/SnapshotReader.java @@ -95,7 +95,9 @@ public SnapshotReader(String name, MySqlTaskContext context) { recorder = this::recordRowAsRead; metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics); this.useGlobalLock = useGlobalLock; - this.mysqlFieldReader = context.getConnectorConfig().useCursorFetch() ? new MysqlBinaryProtocolFieldReader() : new MysqlTextProtocolFieldReader(); + this.mysqlFieldReader = context.getConnectorConfig().useCursorFetch() + ? new MysqlBinaryProtocolFieldReader(context.getConnectorConfig()) + : new MysqlTextProtocolFieldReader(context.getConnectorConfig()); } /** diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlNonUtfDatabaseCharsetIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlNonUtfDatabaseCharsetIT.java new file mode 100644 index 000000000..e479d130d --- /dev/null +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MysqlNonUtfDatabaseCharsetIT.java @@ -0,0 +1,113 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.mysql; + +import static org.fest.assertions.Assertions.assertThat; + +import java.nio.file.Path; +import java.sql.SQLException; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.debezium.config.Configuration; +import io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter; +import io.debezium.embedded.AbstractConnectorTest; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.util.Testing; + +public class MysqlNonUtfDatabaseCharsetIT extends AbstractConnectorTest { + + private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath(); + private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "db_default_charset_noutf", "latin2") + .withDbHistoryPath(DB_HISTORY_PATH); + + private Configuration config; + + @Before + public void beforeEach() { + stopConnector(); + DATABASE.createAndInitialize(); + initializeConnectorTestFramework(); + Testing.Files.delete(DB_HISTORY_PATH); + } + + @After + public void afterEach() { + try { + stopConnector(); + } + finally { + Testing.Files.delete(DB_HISTORY_PATH); + } + } + + @Test + public void useStringsDuringSnapshots() throws InterruptedException, SQLException { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATA") + "," + DATABASE.qualifiedTableName("DATASTREAM")) + .build(); + start(MySqlConnector.class, config); + + Testing.Print.enable(); + + AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(7); + final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATA")).get(0); + + assertThat(((Struct) record.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký"); + assertThat(((Struct) record.value()).getStruct("after").getInt16("FLAG")).isEqualTo((short) 1); + + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + connection.execute("CREATE TABLE DATASTREAM (MESSAGE TEXT, FLAG TINYINT(1));"); + connection.execute("INSERT INTO DATASTREAM VALUES ('Žluťoučký', 1);"); + } + } + + records = consumeRecordsByTopic(2); + final SourceRecord recordStream = records.recordsForTopic(DATABASE.topicForTable("DATASTREAM")).get(0); + + assertThat(((Struct) recordStream.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký"); + assertThat(((Struct) record.value()).getStruct("after").getInt16("FLAG")).isEqualTo((short) 1); + } + + @Test + public void useByteArrayDuringSnapshots() throws InterruptedException, SQLException { + config = DATABASE.defaultConfig() + .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL) + .with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATA") + "," + DATABASE.qualifiedTableName("DATASTREAM")) + .with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "boolean") + .with("boolean.type", TinyIntOneToBooleanConverter.class.getName()) + .with("boolean.selector", ".*") + .build(); + start(MySqlConnector.class, config); + + Testing.Print.enable(); + + AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(7); + final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATA")).get(0); + + assertThat(((Struct) record.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký"); + assertThat(((Struct) record.value()).getStruct("after").getBoolean("FLAG")).isEqualTo(true); + + try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) { + try (JdbcConnection connection = db.connect()) { + connection.execute("CREATE TABLE DATASTREAM (MESSAGE TEXT, FLAG TINYINT(1));"); + connection.execute("INSERT INTO DATASTREAM VALUES ('Žluťoučký', 1);"); + } + } + + records = consumeRecordsByTopic(2); + final SourceRecord recordStream = records.recordsForTopic(DATABASE.topicForTable("DATASTREAM")).get(0); + + assertThat(((Struct) recordStream.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký"); + assertThat(((Struct) record.value()).getStruct("after").getBoolean("FLAG")).isEqualTo(true); + } +} diff --git a/debezium-connector-mysql/src/test/resources/ddl/db_default_charset_noutf.sql b/debezium-connector-mysql/src/test/resources/ddl/db_default_charset_noutf.sql new file mode 100644 index 000000000..e6b963aa3 --- /dev/null +++ b/debezium-connector-mysql/src/test/resources/ddl/db_default_charset_noutf.sql @@ -0,0 +1,5 @@ +CREATE TABLE DATA ( + MESSAGE TEXT, + FLAG TINYINT(1) +); +INSERT INTO DATA VALUES ('Žluťoučký', 1); diff --git a/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java b/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java index acc4e12ae..c5f407469 100644 --- a/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java +++ b/debezium-core/src/main/java/io/debezium/relational/CustomConverterRegistry.java @@ -150,6 +150,13 @@ public Optional getValueConverter(TableId table, Column column) return Optional.of(x -> converterDefinition.converter.convert(x)); } + /** + * @return true if no custom converters will be used by the connector + */ + public boolean isEmpty() { + return conversionFunctionMap.isEmpty(); + } + private String fullColumnName(TableId table, Column column) { return table + "." + column.name(); }