DBZ-3700 Strings a bytes only when converters present

When database was used with non-UTF8 charset the snapshot was sending
byte array with UTF-8 bytes but MySQL converter was using database based
charset to convert it to String leading to malformed data.
It is necessary to convert strings using the database charset but it
might be a problem in case of non-charset matching.
The current solution
1) Uses strings by default
2) Switches to byte array only when converters are present to cover
original use case
3) If the conversion fails it switches back to string
This commit is contained in:
Jiri Pechanec 2022-01-17 11:55:51 +01:00 committed by Gunnar Morling
parent 632a61ee98
commit 8f5bb31eab
9 changed files with 160 additions and 10 deletions

View File

@ -5,15 +5,18 @@
*/ */
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.io.UnsupportedEncodingException;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types; import java.sql.Types;
import java.util.Set;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.relational.Column; import io.debezium.relational.Column;
import io.debezium.relational.Table; import io.debezium.relational.Table;
import io.debezium.util.Collect;
/** /**
* Abstract class for decode MySQL return value according to different protocols. * Abstract class for decode MySQL return value according to different protocols.
@ -24,6 +27,14 @@ public abstract class AbstractMysqlFieldReader implements MysqlFieldReader {
protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Logger logger = LoggerFactory.getLogger(getClass());
private static final Set<String> TEXT_DATATYPES = Collect.unmodifiableSet("CHAR", "VARCHAR", "TEXT");
private final MySqlConnectorConfig connectorConfig;
protected AbstractMysqlFieldReader(MySqlConnectorConfig connectorConfig) {
this.connectorConfig = connectorConfig;
}
@Override @Override
public Object readField(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException { public Object readField(ResultSet rs, int columnIndex, Column column, Table table) throws SQLException {
if (column.jdbcType() == Types.TIME) { if (column.jdbcType() == Types.TIME) {
@ -49,10 +60,14 @@ else if (column.jdbcType() == Types.TINYINT || column.jdbcType() == Types.SMALLI
// DBZ-2673 // DBZ-2673
// It is necessary to check the type names as types like ENUM and SET are // It is necessary to check the type names as types like ENUM and SET are
// also reported as JDBC type char // also reported as JDBC type char
else if ("CHAR".equals(column.typeName()) || else if (!connectorConfig.customConverterRegistry().isEmpty() && TEXT_DATATYPES.contains(column.typeName())) {
"VARCHAR".equals(column.typeName()) || try {
"TEXT".equals(column.typeName())) { return rs.getString(columnIndex).getBytes(column.charsetName());
return rs.getBytes(columnIndex); }
catch (UnsupportedEncodingException e) {
logger.warn("Unsupported encoding '{}' for column '{}', sending value as String");
return rs.getObject(columnIndex);
}
} }
else { else {
return rs.getObject(columnIndex); return rs.getObject(columnIndex);

View File

@ -75,7 +75,7 @@ public MySqlConnection(MySqlConnectionConfiguration connectionConfig, MysqlField
* @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null. * @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null.
*/ */
public MySqlConnection(MySqlConnectionConfiguration connectionConfig) { public MySqlConnection(MySqlConnectionConfiguration connectionConfig) {
this(connectionConfig, new MysqlTextProtocolFieldReader()); this(connectionConfig, new MysqlTextProtocolFieldReader(null));
} }
@Override @Override

View File

@ -80,8 +80,8 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
.build(); .build();
connection = new MySqlConnection(new MySqlConnectionConfiguration(config), connection = new MySqlConnection(new MySqlConnectionConfiguration(config),
connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader(connectorConfig)
: new MysqlTextProtocolFieldReader()); : new MysqlTextProtocolFieldReader(connectorConfig));
validateBinlogConfiguration(connectorConfig); validateBinlogConfiguration(connectorConfig);
@ -140,8 +140,8 @@ public ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> start(Co
Heartbeat heartbeat = null; Heartbeat heartbeat = null;
if (!connectorConfig.getHeartbeatActionQuery().isEmpty()) { if (!connectorConfig.getHeartbeatActionQuery().isEmpty()) {
heartbeatConnection = new MySqlConnection(new MySqlConnectionConfiguration(config), heartbeatConnection = new MySqlConnection(new MySqlConnectionConfiguration(config),
connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader() connectorConfig.useCursorFetch() ? new MysqlBinaryProtocolFieldReader(connectorConfig)
: new MysqlTextProtocolFieldReader()); : new MysqlTextProtocolFieldReader(connectorConfig));
heartbeat = Heartbeat.create( heartbeat = Heartbeat.create(
connectorConfig.getHeartbeatInterval(), connectorConfig.getHeartbeatInterval(),

View File

@ -29,6 +29,10 @@ public class MysqlBinaryProtocolFieldReader extends AbstractMysqlFieldReader {
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlBinaryProtocolFieldReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(MysqlBinaryProtocolFieldReader.class);
public MysqlBinaryProtocolFieldReader(MySqlConnectorConfig config) {
super(config);
}
/** /**
* @see <a href="https://dev.mysql.com/doc/internals/en/binary-protocol-value.html#packet-ProtocolBinary::MYSQL_TYPE_TIME">ProtocolBinary::MYSQL_TYPE_TIME</a> * @see <a href="https://dev.mysql.com/doc/internals/en/binary-protocol-value.html#packet-ProtocolBinary::MYSQL_TYPE_TIME">ProtocolBinary::MYSQL_TYPE_TIME</a>
*/ */

View File

@ -26,6 +26,10 @@ public class MysqlTextProtocolFieldReader extends AbstractMysqlFieldReader {
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlTextProtocolFieldReader.class); private static final Logger LOGGER = LoggerFactory.getLogger(MysqlTextProtocolFieldReader.class);
public MysqlTextProtocolFieldReader(MySqlConnectorConfig config) {
super(config);
}
/** /**
* As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a binary-ish workaround * As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a binary-ish workaround
* *

View File

@ -95,7 +95,9 @@ public SnapshotReader(String name, MySqlTaskContext context) {
recorder = this::recordRowAsRead; recorder = this::recordRowAsRead;
metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics); metrics = new SnapshotReaderMetrics(context, context.dbSchema(), changeEventQueueMetrics);
this.useGlobalLock = useGlobalLock; this.useGlobalLock = useGlobalLock;
this.mysqlFieldReader = context.getConnectorConfig().useCursorFetch() ? new MysqlBinaryProtocolFieldReader() : new MysqlTextProtocolFieldReader(); this.mysqlFieldReader = context.getConnectorConfig().useCursorFetch()
? new MysqlBinaryProtocolFieldReader(context.getConnectorConfig())
: new MysqlTextProtocolFieldReader(context.getConnectorConfig());
} }
/** /**

View File

@ -0,0 +1,113 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mysql;
import static org.fest.assertions.Assertions.assertThat;
import java.nio.file.Path;
import java.sql.SQLException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Testing;
public class MysqlNonUtfDatabaseCharsetIT extends AbstractConnectorTest {
private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "db_default_charset_noutf", "latin2")
.withDbHistoryPath(DB_HISTORY_PATH);
private Configuration config;
@Before
public void beforeEach() {
stopConnector();
DATABASE.createAndInitialize();
initializeConnectorTestFramework();
Testing.Files.delete(DB_HISTORY_PATH);
}
@After
public void afterEach() {
try {
stopConnector();
}
finally {
Testing.Files.delete(DB_HISTORY_PATH);
}
}
@Test
public void useStringsDuringSnapshots() throws InterruptedException, SQLException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATA") + "," + DATABASE.qualifiedTableName("DATASTREAM"))
.build();
start(MySqlConnector.class, config);
Testing.Print.enable();
AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(7);
final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATA")).get(0);
assertThat(((Struct) record.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký");
assertThat(((Struct) record.value()).getStruct("after").getInt16("FLAG")).isEqualTo((short) 1);
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("CREATE TABLE DATASTREAM (MESSAGE TEXT, FLAG TINYINT(1));");
connection.execute("INSERT INTO DATASTREAM VALUES ('Žluťoučký', 1);");
}
}
records = consumeRecordsByTopic(2);
final SourceRecord recordStream = records.recordsForTopic(DATABASE.topicForTable("DATASTREAM")).get(0);
assertThat(((Struct) recordStream.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký");
assertThat(((Struct) record.value()).getStruct("after").getInt16("FLAG")).isEqualTo((short) 1);
}
@Test
public void useByteArrayDuringSnapshots() throws InterruptedException, SQLException {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL)
.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, DATABASE.qualifiedTableName("DATA") + "," + DATABASE.qualifiedTableName("DATASTREAM"))
.with(MySqlConnectorConfig.CUSTOM_CONVERTERS, "boolean")
.with("boolean.type", TinyIntOneToBooleanConverter.class.getName())
.with("boolean.selector", ".*")
.build();
start(MySqlConnector.class, config);
Testing.Print.enable();
AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(7);
final SourceRecord record = records.recordsForTopic(DATABASE.topicForTable("DATA")).get(0);
assertThat(((Struct) record.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký");
assertThat(((Struct) record.value()).getStruct("after").getBoolean("FLAG")).isEqualTo(true);
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName());) {
try (JdbcConnection connection = db.connect()) {
connection.execute("CREATE TABLE DATASTREAM (MESSAGE TEXT, FLAG TINYINT(1));");
connection.execute("INSERT INTO DATASTREAM VALUES ('Žluťoučký', 1);");
}
}
records = consumeRecordsByTopic(2);
final SourceRecord recordStream = records.recordsForTopic(DATABASE.topicForTable("DATASTREAM")).get(0);
assertThat(((Struct) recordStream.value()).getStruct("after").getString("MESSAGE")).isEqualTo("Žluťoučký");
assertThat(((Struct) record.value()).getStruct("after").getBoolean("FLAG")).isEqualTo(true);
}
}

View File

@ -0,0 +1,5 @@
CREATE TABLE DATA (
MESSAGE TEXT,
FLAG TINYINT(1)
);
INSERT INTO DATA VALUES ('Žluťoučký', 1);

View File

@ -150,6 +150,13 @@ public Optional<ValueConverter> getValueConverter(TableId table, Column column)
return Optional.of(x -> converterDefinition.converter.convert(x)); return Optional.of(x -> converterDefinition.converter.convert(x));
} }
/**
* @return true if no custom converters will be used by the connector
*/
public boolean isEmpty() {
return conversionFunctionMap.isEmpty();
}
private String fullColumnName(TableId table, Column column) { private String fullColumnName(TableId table, Column column) {
return table + "." + column.name(); return table + "." + column.name();
} }