diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index faf119735..b41379cd4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -5,7 +5,6 @@ */ package io.debezium.connector.mysql; -import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; @@ -32,7 +31,7 @@ */ public class MySqlJdbcContext implements AutoCloseable { - protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}"; + protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8"; protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL); protected final Logger logger = LoggerFactory.getLogger(getClass()); @@ -49,10 +48,7 @@ public MySqlJdbcContext(Configuration config) { boolean useSSL = sslModeEnabled(); Configuration jdbcConfig = config.subset("database.", true) .edit() - .with("useInformationSchema", "true") - .with("nullCatalogMeansCurrent", "false") .with("useSSL", Boolean.toString(useSSL)) - .with("characterEncoding", StandardCharsets.UTF_8.name()) .build(); this.jdbc = new JdbcConnection(jdbcConfig, FACTORY); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 593cea6ff..032a1a335 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -122,6 +122,7 @@ public ValueConverter converter(Column column, Field fieldDefn) { case Types.SQLXML: Charset charset = charsetFor(column); if (charset != null) { + logger.debug("Using {} charset by default for column: {}", charset, column); return (data) -> convertString(column, fieldDefn, charset, data); } logger.warn("Using UTF-8 charset by default for column without charset: {}", column); diff --git a/debezium-connector-mysql/src/test/docker/init/setup.sql b/debezium-connector-mysql/src/test/docker/init/setup.sql index 2da37542e..e1e2fa339 100644 --- a/debezium-connector-mysql/src/test/docker/init/setup.sql +++ b/debezium-connector-mysql/src/test/docker/init/setup.sql @@ -237,4 +237,16 @@ CREATE TABLE dbz_100_enumsettest ( ); INSERT INTO dbz_100_enumsettest VALUES ('a', 'a,b,c'); INSERT INTO dbz_100_enumsettest VALUES ('b', 'b,a'); -INSERT INTO dbz_100_enumsettest VALUES ('c', 'a'); \ No newline at end of file +INSERT INTO dbz_100_enumsettest VALUES ('c', 'a'); + +-- DBZ-102 handle character sets +-- Use session variables to dictate the character sets used by the client running these commands so +-- the literal value is interpretted correctly... +set character_set_client=utf8; +set character_set_connection=utf8; +CREATE TABLE dbz_102_charsettest ( + id INT(11) NOT NULL AUTO_INCREMENT, + text VARCHAR(255) DEFAULT NULL, + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=2001 DEFAULT CHARSET=utf8; +INSERT INTO dbz_102_charsettest VALUES (default, "产品"); \ No newline at end of file diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index 44c8d853a..5472c789e 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -86,17 +86,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- // Testing.Debug.enable(); - SourceRecords records = consumeRecordsByTopic(5 + 6); // 5 schema change record, 6 inserts + SourceRecords records = consumeRecordsByTopic(6 + 7); // 5 schema change record, 7 inserts stopConnector(); assertThat(records).isNotNull(); - assertThat(records.recordsForTopic("regression").size()).isEqualTo(5); + assertThat(records.recordsForTopic("regression").size()).isEqualTo(6); assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3); - assertThat(records.topics().size()).isEqualTo(5); + assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(6); assertThat(records.databaseNames().size()).isEqualTo(1); - assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(5); + assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(6); assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); records.ddlRecordsForDatabase("regression_test").forEach(this::print); @@ -118,6 +119,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws } else { fail("c1 didn't match expected value"); } + } else if (record.topic().endsWith("dbz_102_charsettest")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + String text = after.getString("text"); + assertThat(text).isEqualTo("产品"); } else if (record.topic().endsWith("dbz_85_fractest")) { // The microseconds of all three should be exactly 780 Struct after = value.getStruct(Envelope.FieldName.AFTER); @@ -207,17 +212,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- // Testing.Debug.enable(); - SourceRecords records = consumeRecordsByTopic(5 + 6); // 5 schema change record, 6 inserts + SourceRecords records = consumeRecordsByTopic(6 + 7); // 6 schema change record, 7 inserts stopConnector(); assertThat(records).isNotNull(); - assertThat(records.recordsForTopic("regression").size()).isEqualTo(5); + assertThat(records.recordsForTopic("regression").size()).isEqualTo(6); assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3); - assertThat(records.topics().size()).isEqualTo(5); + assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(6); assertThat(records.databaseNames().size()).isEqualTo(1); - assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(5); + assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(6); assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); records.ddlRecordsForDatabase("regression_test").forEach(this::print); @@ -239,6 +245,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect } else { fail("c1 didn't match expected value"); } + } else if (record.topic().endsWith("dbz_102_charsettest")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + String text = after.getString("text"); + assertThat(text).isEqualTo("产品"); } else if (record.topic().endsWith("dbz_85_fractest")) { // The microseconds of all three should be exactly 780 Struct after = value.getStruct(Envelope.FieldName.AFTER); @@ -324,21 +334,27 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // --------------------------------------------------------------------------------------------------------------- // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- - // Testing.Debug.enable(); - // 12 schema change records = 1 set variables, 5 drop tables, 1 drop database, 1 create database, 1 use database, 5 create - // tables - SourceRecords records = consumeRecordsByTopic(12 + 6); // plus 6 data records ... + Testing.Debug.enable(); + // We expect a total of 14 schema change records: + // 1 set variables + // 6 drop tables + // 1 drop database + // 1 create database + // 1 use database + // 6 create tables + SourceRecords records = consumeRecordsByTopic(14 + 7); // plus 7 data records ... stopConnector(); assertThat(records).isNotNull(); - assertThat(records.recordsForTopic("regression").size()).isEqualTo(12); + assertThat(records.recordsForTopic("regression").size()).isEqualTo(14); assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3); - assertThat(records.topics().size()).isEqualTo(5); + assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1); + assertThat(records.topics().size()).isEqualTo(6); assertThat(records.databaseNames().size()).isEqualTo(2); assertThat(records.databaseNames()).containsOnly("regression_test",""); - assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(11); + assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(13); assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement @@ -361,6 +377,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio } else { fail("c1 didn't match expected value"); } + } else if (record.topic().endsWith("dbz_102_charsettest")) { + Struct after = value.getStruct(Envelope.FieldName.AFTER); + String text = after.getString("text"); + assertThat(text).isEqualTo("产品"); } else if (record.topic().endsWith("dbz_85_fractest")) { // The microseconds of all three should be exactly 780 Struct after = value.getStruct(Envelope.FieldName.AFTER);