DBZ-102 Added MySQL integration test that verifies character encodings

Added a table with data to one of the MySQL databases used in the integration tests. It verifies that the UTF-8 data stored in the table is able to be handled properly when obtaining a snapshot and reading the binlog.
This commit is contained in:
Randall Hauch 2016-08-29 13:38:54 -05:00
parent cc94bbc697
commit a46a427b57
4 changed files with 50 additions and 21 deletions

View File

@ -5,7 +5,6 @@
*/
package io.debezium.connector.mysql;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
@ -32,7 +31,7 @@
*/
public class MySqlJdbcContext implements AutoCloseable {
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}";
protected static final String MYSQL_CONNECTION_URL = "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=${useSSL}&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8";
protected static ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(MYSQL_CONNECTION_URL);
protected final Logger logger = LoggerFactory.getLogger(getClass());
@ -49,10 +48,7 @@ public MySqlJdbcContext(Configuration config) {
boolean useSSL = sslModeEnabled();
Configuration jdbcConfig = config.subset("database.", true)
.edit()
.with("useInformationSchema", "true")
.with("nullCatalogMeansCurrent", "false")
.with("useSSL", Boolean.toString(useSSL))
.with("characterEncoding", StandardCharsets.UTF_8.name())
.build();
this.jdbc = new JdbcConnection(jdbcConfig, FACTORY);
}

View File

@ -122,6 +122,7 @@ public ValueConverter converter(Column column, Field fieldDefn) {
case Types.SQLXML:
Charset charset = charsetFor(column);
if (charset != null) {
logger.debug("Using {} charset by default for column: {}", charset, column);
return (data) -> convertString(column, fieldDefn, charset, data);
}
logger.warn("Using UTF-8 charset by default for column without charset: {}", column);

View File

@ -237,4 +237,16 @@ CREATE TABLE dbz_100_enumsettest (
);
INSERT INTO dbz_100_enumsettest VALUES ('a', 'a,b,c');
INSERT INTO dbz_100_enumsettest VALUES ('b', 'b,a');
INSERT INTO dbz_100_enumsettest VALUES ('c', 'a');
INSERT INTO dbz_100_enumsettest VALUES ('c', 'a');
-- DBZ-102 handle character sets
-- Use session variables to dictate the character sets used by the client running these commands so
-- the literal value is interpretted correctly...
set character_set_client=utf8;
set character_set_connection=utf8;
CREATE TABLE dbz_102_charsettest (
id INT(11) NOT NULL AUTO_INCREMENT,
text VARCHAR(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2001 DEFAULT CHARSET=utf8;
INSERT INTO dbz_102_charsettest VALUES (default, "产品");

View File

@ -86,17 +86,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(5 + 6); // 5 schema change record, 6 inserts
SourceRecords records = consumeRecordsByTopic(6 + 7); // 5 schema change record, 7 inserts
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(5);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(6);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.topics().size()).isEqualTo(5);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(6);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(5);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
@ -118,6 +119,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws
} else {
fail("c1 didn't match expected value");
}
} else if (record.topic().endsWith("dbz_102_charsettest")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
String text = after.getString("text");
assertThat(text).isEqualTo("产品");
} else if (record.topic().endsWith("dbz_85_fractest")) {
// The microseconds of all three should be exactly 780
Struct after = value.getStruct(Envelope.FieldName.AFTER);
@ -207,17 +212,18 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
SourceRecords records = consumeRecordsByTopic(5 + 6); // 5 schema change record, 6 inserts
SourceRecords records = consumeRecordsByTopic(6 + 7); // 6 schema change record, 7 inserts
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(5);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(6);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.topics().size()).isEqualTo(5);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(6);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(5);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(6);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("regression_test").forEach(this::print);
@ -239,6 +245,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect
} else {
fail("c1 didn't match expected value");
}
} else if (record.topic().endsWith("dbz_102_charsettest")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
String text = after.getString("text");
assertThat(text).isEqualTo("产品");
} else if (record.topic().endsWith("dbz_85_fractest")) {
// The microseconds of all three should be exactly 780
Struct after = value.getStruct(Envelope.FieldName.AFTER);
@ -324,21 +334,27 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
// Testing.Debug.enable();
// 12 schema change records = 1 set variables, 5 drop tables, 1 drop database, 1 create database, 1 use database, 5 create
// tables
SourceRecords records = consumeRecordsByTopic(12 + 6); // plus 6 data records ...
Testing.Debug.enable();
// We expect a total of 14 schema change records:
// 1 set variables
// 6 drop tables
// 1 drop database
// 1 create database
// 1 use database
// 6 create tables
SourceRecords records = consumeRecordsByTopic(14 + 7); // plus 7 data records ...
stopConnector();
assertThat(records).isNotNull();
assertThat(records.recordsForTopic("regression").size()).isEqualTo(12);
assertThat(records.recordsForTopic("regression").size()).isEqualTo(14);
assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1);
assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3);
assertThat(records.topics().size()).isEqualTo(5);
assertThat(records.recordsForTopic("regression.regression_test.dbz_102_charsettest").size()).isEqualTo(1);
assertThat(records.topics().size()).isEqualTo(6);
assertThat(records.databaseNames().size()).isEqualTo(2);
assertThat(records.databaseNames()).containsOnly("regression_test","");
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(11);
assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(13);
assertThat(records.ddlRecordsForDatabase("connector_test")).isNull();
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement
@ -361,6 +377,10 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
} else {
fail("c1 didn't match expected value");
}
} else if (record.topic().endsWith("dbz_102_charsettest")) {
Struct after = value.getStruct(Envelope.FieldName.AFTER);
String text = after.getString("text");
assertThat(text).isEqualTo("产品");
} else if (record.topic().endsWith("dbz_85_fractest")) {
// The microseconds of all three should be exactly 780
Struct after = value.getStruct(Envelope.FieldName.AFTER);