From cc94bbc6973757942a16ae709ae874526a2855da Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Mon, 29 Aug 2016 12:19:24 -0500 Subject: [PATCH] DBZ-102 MySQL connector now processes character sets The MySQL binlog events contain the binary representation of string-like values as encoded per the column's character set. Properly decoding these into Java strings requires capturing the column, table, and database character set when parsing the DDL statements. Unfortunately, MySQL DDL allows columns (at the time the columns are created or modified) to inherit the default character set for the table, or if that is not defined the default character set for the database, or if that is not defined the character set for the server. So, in addition to modifying the MySQL DDL parser to support capturing the character set name for each column, it also had to be changed to know what these default character set names are. The default character sets are all available via MySQL server/session/local variables. Although strictly speaking the character set variables cannot be set globally, MySQL DDL does allow session and local variables to be set with `SET` statements. Therefore, this commit enhances the MySQL DDL parser to parse `SET` statements and to track the various global, session, and local variables as seen by the DDL parser. Upon connector startup, a subset of server variables (related to character sets and collations) are read from the database via JDBC and used to initialize the DDL parser via `SET` methods. In addition to initializing the DDL parser with the system variables related to character sets and collation, it is important to also capture the server and database default character sets in the database history so that the correct character sets are used for columns even when the default character sets have changed on the database and/or the server. Therefore, upon startup or snapshot the MySQL connector records in the database history a `SET` statement for the `character_set_server` and `collation_server` system variables so that, upon a later restart, the history's DDL statements can be re-parsed with the correct default server and database character sets. Also, when the MySQL connector reloads the database history (upon startup), the recorded default server character set is compared with the MySQL instance's current server character set, and if they are different the current character set is recorded with a new `SET` statement. These extra steps ensure that the connector use the correct character set for each column, even when the connector restarts and reloads the database history captured by a previous version of the connector. IOW, the MySQL connector can be safely upgraded, and the new version will correctly start using the columns' character sets to decode the string-like values. --- CHANGELOG.md | 2 +- .../connector/mysql/MySqlConnectorTask.java | 2 + .../connector/mysql/MySqlJdbcContext.java | 75 ++++++++++-- .../debezium/connector/mysql/MySqlSchema.java | 27 ++++- .../connector/mysql/MySqlTaskContext.java | 39 ++++++- .../connector/mysql/MySqlValueConverters.java | 107 +++++++++++++++--- .../connector/mysql/RowDeserializers.java | 58 ++++++++++ .../connector/mysql/SnapshotReader.java | 20 +++- .../connector/mysql/BinlogReaderIT.java | 2 + .../connector/mysql/MySqlConnectorIT.java | 7 +- .../mysql/MySqlConnectorRegressionIT.java | 29 ++--- .../connector/mysql/MySqlSchemaTest.java | 5 + .../connector/mysql/SnapshotReaderIT.java | 8 +- .../io/debezium/jdbc/JdbcValueConverters.java | 2 +- 14 files changed, 329 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e3a2ab20..5f5b7b1e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,7 @@ August XX, 2016 - [Detailed release notes](https://issues.jboss.org/browse/DBZ/f ### Backwards-incompatible changes since 0.3.0 -None +* MySQL connector now properly decodes string values from the binlog based upon the column's character set encoding as read by the DDL statement. Upon upgrade and restart, the connector will re-read the recorded database history and now associate the columns with their the character sets, and any newly processed events will use properly encoded strings values. As expected, previously generated events are never altered. Force a snapshot to regenerate events for the servers. [DBZ-102](https://issues.jboss.org/projects/DBZ/issues/DBZ-102) ### Fixes since 0.3.0 diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 6f9974317..40006a4a4 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -114,11 +114,13 @@ public void start(Map props) { // full history of the database. logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog"); source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog + taskContext.initializeHistory(); } else { // We are allowed to use snapshots, and that is the best way to start ... startWithSnapshot = true; // The snapshot will determine if GTIDs are set logger.info("Found no existing offset, so preparing to perform a snapshot"); + // The snapshot will also initialize history ... } } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java index e57ca23a5..faf119735 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlJdbcContext.java @@ -5,9 +5,14 @@ */ package io.debezium.connector.mysql; +import java.nio.charset.StandardCharsets; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; @@ -18,6 +23,7 @@ import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.JdbcConnection.ConnectionFactory; +import io.debezium.util.Strings; /** * A context for a JDBC connection to MySQL. @@ -32,19 +38,21 @@ public class MySqlJdbcContext implements AutoCloseable { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Configuration config; protected final JdbcConnection jdbc; - private final Map originalSystemProperties = new HashMap<>(); + private final Map originalSystemProperties = new HashMap<>(); public MySqlJdbcContext(Configuration config) { this.config = config; // must be set before most methods are used // Set up the JDBC connection without actually connecting, with extra MySQL-specific properties - // to give us better JDBC database metadata behavior ... + // to give us better JDBC database metadata behavior, including using UTF-8 for the client-side character encoding + // per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html boolean useSSL = sslModeEnabled(); Configuration jdbcConfig = config.subset("database.", true) .edit() .with("useInformationSchema", "true") .with("nullCatalogMeansCurrent", "false") .with("useSSL", Boolean.toString(useSSL)) + .with("characterEncoding", StandardCharsets.UTF_8.name()) .build(); this.jdbc = new JdbcConnection(jdbcConfig, FACTORY); } @@ -81,11 +89,11 @@ public SecureConnectionMode sslMode() { String mode = config.getString(MySqlConnectorConfig.SSL_MODE); return SecureConnectionMode.parse(mode); } - + public boolean sslModeEnabled() { return sslMode() != SecureConnectionMode.DISABLED; } - + public void start() { if (sslModeEnabled()) { originalSystemProperties.clear(); @@ -104,9 +112,9 @@ public void shutdown() { logger.error("Unexpected error shutting down the database connection", e); } finally { // Reset the system properties to their original value ... - originalSystemProperties.forEach((name,value)->{ - if ( value != null ) { - System.setProperty(name,value); + originalSystemProperties.forEach((name, value) -> { + if (value != null) { + System.setProperty(name, value); } else { System.clearProperty(name); } @@ -123,6 +131,59 @@ protected String connectionString() { return jdbc.connectionString(MYSQL_CONNECTION_URL); } + /** + * Read the MySQL charset-related system variables. + * + * @param sql the reference that should be set to the SQL statement; may be null if not needed + * @return the system variables that are related to server character sets; never null + */ + protected Map readMySqlCharsetSystemVariables(AtomicReference sql) { + // Read the system variables from the MySQL instance and get the current database name ... + Map variables = new HashMap<>(); + try (JdbcConnection mysql = jdbc.connect()) { + logger.debug("Reading MySQL charset-related system variables before parsing DDL history."); + String statement = "SHOW VARIABLES WHERE Variable_name IN ('character_set_server','collation_server')"; + if (sql != null) sql.set(statement); + mysql.query(statement, rs -> { + while (rs.next()) { + String varName = rs.getString(1); + String value = rs.getString(2); + if (varName != null && value != null) { + variables.put(varName, value); + logger.debug("\t{} = {}", + Strings.pad(varName, 45, ' '), + Strings.pad(value, 45, ' ')); + } + } + }); + } catch (SQLException e) { + throw new ConnectException("Error reading MySQL variables: " + e.getMessage(), e); + } + return variables; + } + + protected String setStatementFor(Map variables) { + StringBuilder sb = new StringBuilder("SET "); + boolean first = true; + List varNames = new ArrayList<>(variables.keySet()); + Collections.sort(varNames); + for (String varName : varNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(varName).append("="); + String value = variables.get(varName); + if (value == null) value = ""; + if (value.contains(",") || value.contains(";")) { + value = "'" + value + "'"; + } + sb.append(value); + } + return sb.append(";").toString(); + } + protected void setSystemProperty(String property, Field field, boolean showValueInError) { String value = config.getString(field); if (value != null) { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java index e8032a693..2b4f0db41 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSchema.java @@ -24,6 +24,7 @@ import io.debezium.annotation.NotThreadSafe; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.TemporalPrecisionMode; +import io.debezium.connector.mysql.MySqlSystemVariables.Scope; import io.debezium.jdbc.JdbcConnection; import io.debezium.jdbc.TimeZoneAdapter; import io.debezium.relational.Table; @@ -106,7 +107,7 @@ public MySqlSchema(Configuration config, String serverName) { boolean adaptiveTimePrecision = TemporalPrecisionMode.ADAPTIVE.equals(timePrecisionMode); MySqlValueConverters valueConverters = new MySqlValueConverters(adaptiveTimePrecision); this.schemaBuilder = new TableSchemaBuilder(valueConverters, schemaNameValidator::validate); - + // Set up the server name and schema prefix ... if (serverName != null) serverName = serverName.trim(); this.serverName = serverName; @@ -197,6 +198,26 @@ public String historyLocation() { return dbHistory.toString(); } + /** + * Set the system variables on the DDL parser. + * + * @param variables the system variables; may not be null but may be empty + */ + public void setSystemVariables(Map variables) { + variables.forEach((varName, value) -> { + ddlParser.systemVariables().setVariable(Scope.SESSION, varName, value); + }); + } + + /** + * Get the system variables as known by the DDL parser. + * + * @return the system variables; never null + */ + public MySqlSystemVariables systemVariables() { + return ddlParser.systemVariables(); + } + /** * Load the schema for the databases using JDBC database metadata. If there are changes relative to any * table definitions that existed when this method is called, those changes are recorded in the database history @@ -335,12 +356,12 @@ public boolean applyDdl(SourceInfo source, String databaseName, String ddlStatem // the same order they were read for each _affected_ database, grouped together if multiple apply // to the same _affected_ database... ddlChanges.groupStatementStringsByDatabase((dbName, ddl) -> { - if (filters.databaseFilter().test(dbName)) { + if (filters.databaseFilter().test(dbName) || dbName == null || "".equals(dbName)) { if (dbName == null) dbName = ""; statementConsumer.consume(dbName, ddlStatements); } }); - } else if (filters.databaseFilter().test(databaseName)) { + } else if (filters.databaseFilter().test(databaseName) || databaseName == null || "".equals(databaseName)) { if (databaseName == null) databaseName = ""; statementConsumer.consume(databaseName, ddlStatements); } diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java index 86fc73582..ef91007b0 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlTaskContext.java @@ -5,11 +5,14 @@ */ package io.debezium.connector.mysql; +import java.util.Map; + import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.util.Clock; import io.debezium.util.LoggingContext; import io.debezium.util.LoggingContext.PreviousContext; +import io.debezium.util.Strings; /** * A Kafka Connect source task reads the MySQL binary log and generate the corresponding data change events. @@ -59,6 +62,19 @@ public RecordMakers makeRecord() { return recordProcessor; } + /** + * Initialize the database history with any server-specific information. This should be done only upon connector startup + * when the connector has no prior history. + */ + public void initializeHistory() { + // Read the system variables from the MySQL instance and get the current database name ... + Map variables = readMySqlCharsetSystemVariables(null); + String ddlStatement = setStatementFor(variables); + + // And write them into the database history ... + dbSchema.applyDdl(source, "", ddlStatement, null); + } + /** * Load the database schema information using the previously-recorded history, and stop reading the history when the * the history reaches the supplied starting point. @@ -67,7 +83,23 @@ public RecordMakers makeRecord() { * offset} at which the database schemas are to reflect; may not be null */ public void loadHistory(SourceInfo startingPoint) { + // Read the system variables from the MySQL instance and load them into the DDL parser as defaults ... + Map variables = readMySqlCharsetSystemVariables(null); + dbSchema.setSystemVariables(variables); + + // And then load the history ... dbSchema.loadHistory(startingPoint); + + // The server's default character set may have changed since we last recorded it in the history, + // so we need to see if the history's state does not match ... + String systemCharsetName = variables.get(MySqlSystemVariables.CHARSET_NAME_SERVER); + String systemCharsetNameFromHistory = dbSchema.systemVariables().getVariable(MySqlSystemVariables.CHARSET_NAME_SERVER); + if (!Strings.equalsIgnoreCase(systemCharsetName, systemCharsetNameFromHistory)) { + // The history's server character set is NOT the same as the server's current default, + // so record the change in the history ... + String ddlStatement = setStatementFor(variables); + dbSchema.applyDdl(source, "", ddlStatement, null); + } recordProcessor.regenerate(); } @@ -81,7 +113,7 @@ public long serverId() { public String serverName() { String serverName = config.getString(MySqlConnectorConfig.SERVER_NAME); - if ( serverName == null ) { + if (serverName == null) { serverName = hostname() + ":" + port(); } return serverName; @@ -102,7 +134,7 @@ public long timeoutInMilliseconds() { public long pollIntervalInMillseconds() { return config.getLong(MySqlConnectorConfig.POLL_INTERVAL_MS); } - + public long rowCountForLargeTable() { return config.getLong(MySqlConnectorConfig.ROW_COUNT_FOR_STREAMING_RESULT_SETS); } @@ -150,6 +182,7 @@ public void shutdown() { /** * Configure the logger's Mapped Diagnostic Context (MDC) properties for the thread making this call. + * * @param contextName the name of the context; may not be null * @return the previous MDC context; never null * @throws IllegalArgumentException if {@code contextName} is null @@ -157,7 +190,7 @@ public void shutdown() { public PreviousContext configureLoggingContext(String contextName) { return LoggingContext.forConnector("MySQL", serverName(), contextName); } - + /** * Run the supplied function in the temporary connector MDC context, and when complete always return the MDC context to its * state before this method was called. diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java index 8adb57dfd..593cea6ff 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java @@ -5,14 +5,20 @@ */ package io.debezium.connector.mysql; +import java.nio.charset.Charset; +import java.nio.charset.IllegalCharsetNameException; +import java.nio.charset.StandardCharsets; import java.sql.Types; import java.time.OffsetDateTime; import java.time.ZoneOffset; import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.source.SourceRecord; import com.github.shyiko.mysql.binlog.event.deserialization.AbstractRowsEventDataDeserializer; +import com.mysql.jdbc.CharsetMapping; import io.debezium.annotation.Immutable; import io.debezium.jdbc.JdbcValueConverters; @@ -73,11 +79,11 @@ public SchemaBuilder schemaBuilder(Column column) { return Year.builder(); } if (matches(typeName, "ENUM")) { - String commaSeparatedOptions = extractEnumAndSetOptions(column,true); + String commaSeparatedOptions = extractEnumAndSetOptions(column, true); return io.debezium.data.Enum.builder(commaSeparatedOptions); } if (matches(typeName, "SET")) { - String commaSeparatedOptions = extractEnumAndSetOptions(column,true); + String commaSeparatedOptions = extractEnumAndSetOptions(column, true); return io.debezium.data.EnumSet.builder(commaSeparatedOptions); } // Otherwise, let the base class handle it ... @@ -93,18 +99,87 @@ public ValueConverter converter(Column column, Field fieldDefn) { } if (matches(typeName, "ENUM")) { // Build up the character array based upon the column's type ... - String options = extractEnumAndSetOptions(column,false); + String options = extractEnumAndSetOptions(column, false); return (data) -> convertEnumToString(options, column, fieldDefn, data); } if (matches(typeName, "SET")) { // Build up the character array based upon the column's type ... - String options = extractEnumAndSetOptions(column,false); + String options = extractEnumAndSetOptions(column, false); return (data) -> convertSetToString(options, column, fieldDefn, data); } + + // We have to convert bytes encoded in the column's character set ... + switch (column.jdbcType()) { + case Types.CHAR: // variable-length + case Types.VARCHAR: // variable-length + case Types.LONGVARCHAR: // variable-length + case Types.CLOB: // variable-length + case Types.NCHAR: // fixed-length + case Types.NVARCHAR: // fixed-length + case Types.LONGNVARCHAR: // fixed-length + case Types.NCLOB: // fixed-length + case Types.DATALINK: + case Types.SQLXML: + Charset charset = charsetFor(column); + if (charset != null) { + return (data) -> convertString(column, fieldDefn, charset, data); + } + logger.warn("Using UTF-8 charset by default for column without charset: {}", column); + return (data) -> convertString(column, fieldDefn, StandardCharsets.UTF_8, data); + default: + break; + } + // Otherwise, let the base class handle it ... return super.converter(column, fieldDefn); } + /** + * Return the {@link Charset} instance with the MySQL-specific character set name used by the given column. + * + * @param column the column in which the character set is used; never null + * @return the Java {@link Charset}, or null if there is no mapping + */ + protected Charset charsetFor(Column column) { + String mySqlCharsetName = column.charsetName(); + if (mySqlCharsetName == null) { + logger.warn("Column is missing a character set: {}", column); + return null; + } + String encoding = CharsetMapping.getJavaEncodingForMysqlCharset(mySqlCharsetName); + if (encoding == null) { + logger.warn("Column uses MySQL character set '{}', which has no mapping to a Java character set", mySqlCharsetName); + } else { + try { + return Charset.forName(encoding); + } catch (IllegalCharsetNameException e) { + logger.error("Unable to load Java charset '{}' for column with MySQL character set '{}'", encoding, mySqlCharsetName); + } + } + return null; + } + + /** + * Convert the {@link String} or {@code byte[]} value to a string value used in a {@link SourceRecord}. + * + * @param column the column in which the value appears + * @param fieldDefn the field definition for the {@link SourceRecord}'s {@link Schema}; never null + * @param columnCharset the Java character set in which column byte[] values are encoded; may not be null + * @param data the data; may be null + * @return the string value; may be null if the value is null or is an unknown input type + */ + protected Object convertString(Column column, Field fieldDefn, Charset columnCharset, Object data) { + if (data == null) return null; + if (data instanceof byte[]) { + // Decode the binary representation using the given character encoding ... + return new String((byte[]) data, columnCharset); + } + if (data instanceof String) { + return data; + } + return handleUnknownData(column, fieldDefn, data); + } + /** * Converts a value object for a MySQL {@code YEAR}, which appear in the binlog as an integer though returns from * the MySQL JDBC driver as either a short or a {@link java.sql.Date}. @@ -151,7 +226,7 @@ protected Object convertEnumToString(String options, Column column, Field fieldD } if (data instanceof Integer) { // The binlog will contain an int with the 1-based index of the option in the enum value ... - int index = ((Integer) data).intValue() - 1; // 'options' is 0-based + int index = ((Integer) data).intValue() - 1; // 'options' is 0-based if (index < options.length()) { return options.substring(index, index + 1); } @@ -180,7 +255,7 @@ protected Object convertSetToString(String options, Column column, Field fieldDe if (data instanceof Long) { // The binlog will contain a long with the indexes of the options in the set value ... long indexes = ((Long) data).longValue(); - return convertSetValue(indexes,options); + return convertSetValue(indexes, options); } return handleUnknownData(column, fieldDefn, data); } @@ -200,25 +275,29 @@ protected boolean matches(String upperCaseTypeName, String upperCaseMatch) { protected String extractEnumAndSetOptions(Column column, boolean commaSeparated) { String options = MySqlDdlParser.parseSetAndEnumOptions(column.typeExpression()); - if ( !commaSeparated ) return options; + if (!commaSeparated) return options; StringBuilder sb = new StringBuilder(); boolean first = true; - for ( int i=0; i!=options.length(); ++i ) { - if ( first ) first = false; - else sb.append(','); + for (int i = 0; i != options.length(); ++i) { + if (first) + first = false; + else + sb.append(','); sb.append(options.charAt(i)); } return sb.toString(); } - - protected String convertSetValue( long indexes, String options ) { + + protected String convertSetValue(long indexes, String options) { StringBuilder sb = new StringBuilder(); int index = 0; boolean first = true; while (indexes != 0L) { if (indexes % 2L != 0) { - if ( first ) first = false; - else sb.append(','); + if (first) + first = false; + else + sb.append(','); sb.append(options.substring(index, index + 1)); } ++index; diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java index 278f9c199..48c3ea816 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/RowDeserializers.java @@ -56,6 +56,16 @@ public static class DeleteRowsDeserializer extends DeleteRowsEventDataDeserializ public DeleteRowsDeserializer(Map tableMapEventByTableId) { super(tableMapEventByTableId); } + + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } @Override protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws IOException { @@ -104,6 +114,16 @@ public UpdateRowsDeserializer(Map tableMapEventByTableI super(tableMapEventByTableId); } + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } + @Override protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws IOException { return RowDeserializers.deserializeDate(inputStream); @@ -151,6 +171,16 @@ public WriteRowsDeserializer(Map tableMapEventByTableId super(tableMapEventByTableId); } + @Override + protected Serializable deserializeString(int length, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeString(length, inputStream); + } + + @Override + protected Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) throws IOException { + return RowDeserializers.deserializeVarString(meta, inputStream); + } + @Override protected Serializable deserializeDate(ByteArrayInputStream inputStream) throws IOException { return RowDeserializers.deserializeDate(inputStream); @@ -192,6 +222,34 @@ protected Serializable deserializeYear(ByteArrayInputStream inputStream) throws } } + /** + * Converts a MySQL string to a {@code byte[]}. + * + * @param length the number of bytes used to store the length of the string + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@code byte[]} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeString(int length, ByteArrayInputStream inputStream) throws IOException { + // charset is not present in the binary log (meaning there is no way to distinguish between CHAR / BINARY) + // as a result - return byte[] instead of an actual String + int stringLength = length < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2); + return inputStream.read(stringLength); + } + + /** + * Converts a MySQL string to a {@code byte[]}. + * + * @param meta the {@code meta} value containing the number of bytes in the length field + * @param inputStream the binary stream containing the raw binlog event data for the value + * @return the {@code byte[]} object + * @throws IOException if there is an error reading from the binlog event data + */ + protected static Serializable deserializeVarString(int meta, ByteArrayInputStream inputStream) throws IOException { + int varcharLength = meta < 256 ? inputStream.readInteger(1) : inputStream.readInteger(2); + return inputStream.read(varcharLength); + } + /** * Converts a MySQL {@code DATE} value to a {@link LocalDate}. * diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java index 6e38cd5d8..d3688c10a 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/SnapshotReader.java @@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; + import org.apache.kafka.connect.source.SourceRecord; import io.debezium.connector.mysql.RecordMakers.RecordsForTable; @@ -168,6 +169,10 @@ protected void execute() { sql.set("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"); mysql.execute(sql.get()); + // Generate the DDL statements that set the charset-related system variables ... + Map systemVariables = context.readMySqlCharsetSystemVariables(sql); + String setSystemVariablesStatement = context.setStatementFor(systemVariables); + // ------ // STEP 1 // ------ @@ -206,13 +211,14 @@ protected void execute() { String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set source.setGtidSet(gtidSet); logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition, - gtidSet); + gtidSet); } else { logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition); } source.startSnapshot(); } else { - throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt + "'. Make sure your server is correctly configured"); + throw new IllegalStateException("Cannot read the binlog filename and position via '" + showMasterStmt + + "'. Make sure your server is correctly configured"); } }); @@ -264,14 +270,18 @@ protected void execute() { // Transform the current schema so that it reflects the *current* state of the MySQL server's contents. // First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ... logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:"); + schema.applyDdl(source, null, setSystemVariablesStatement, this::enqueueSchemaChanges); + // Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ... Set allTableIds = new HashSet<>(schema.tables().tableIds()); allTableIds.addAll(tableIds); - allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges)); + allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, + this::enqueueSchemaChanges)); // Add a DROP DATABASE statement for each database that we no longer know about ... schema.tables().tableIds().stream().map(TableId::catalog) .filter(Predicates.not(databaseNames::contains)) - .forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, this::enqueueSchemaChanges)); + .forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, + this::enqueueSchemaChanges)); // Now process all of our tables for each database ... for (Map.Entry> entry : tableIdsByDbName.entrySet()) { String dbName = entry.getKey(); @@ -472,7 +482,7 @@ private Statement createStatement(Connection connection) throws SQLException { private void logServerInformation(JdbcConnection mysql) { try { logger.info("MySQL server variables related to change data capture:"); - mysql.query("SHOW VARIABLES WHERE Variable_name REGEXP 'version|binlog|tx_|gtid'", rs -> { + mysql.query("SHOW VARIABLES WHERE Variable_name REGEXP 'version|binlog|tx_|gtid|character_set|collation'", rs -> { while (rs.next()) { logger.info("\t{} = {}", Strings.pad(rs.getString(1), 45, ' '), diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java index a7cb82aa7..69a26c491 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BinlogReaderIT.java @@ -116,6 +116,7 @@ public void shouldCreateSnapshotOfSingleDatabase() throws Exception { context = new MySqlTaskContext(config); context.start(); context.source().setBinlogStartPoint("",0L); // start from beginning + context.initializeHistory(); reader = new BinlogReader(context); // Start reading the binlog ... @@ -175,6 +176,7 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep context = new MySqlTaskContext(config); context.start(); context.source().setBinlogStartPoint("",0L); // start from beginning + context.initializeHistory(); reader = new BinlogReader(context); // Start reading the binlog ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java index 9d165a484..c01b93074 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorIT.java @@ -286,16 +286,17 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // --------------------------------------------------------------------------------------------------------------- // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- - SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11); // 11 schema change records - assertThat(records.recordsForTopic("myServer").size()).isEqualTo(11); + SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11 + 1); // 11 schema change records + 1 SET statement + assertThat(records.recordsForTopic("myServer").size()).isEqualTo(12); assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9); assertThat(records.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4); assertThat(records.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5); assertThat(records.topics().size()).isEqualTo(5); - assertThat(records.databaseNames().size()).isEqualTo(1); + assertThat(records.databaseNames().size()).isEqualTo(2); assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(11); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); records.ddlRecordsForDatabase("connector_test").forEach(this::print); // Check that all records are valid, can be serialized and deserialized ... diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java index c8b3e11fc..44c8d853a 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlConnectorRegressionIT.java @@ -109,11 +109,11 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshot() throws Struct after = value.getStruct(Envelope.FieldName.AFTER); String c1 = after.getString("c1"); String c2 = after.getString("c2"); - if ( c1.equals("a") ) { + if (c1.equals("a")) { assertThat(c2).isEqualTo("a,b,c"); - } else if ( c1.equals("b") ) { + } else if (c1.equals("b")) { assertThat(c2).isEqualTo("a,b"); - } else if ( c1.equals("c") ) { + } else if (c1.equals("c")) { assertThat(c2).isEqualTo("a"); } else { fail("c1 didn't match expected value"); @@ -230,11 +230,11 @@ public void shouldConsumeAllEventsFromDatabaseUsingBinlogAndNoSnapshotAndConnect Struct after = value.getStruct(Envelope.FieldName.AFTER); String c1 = after.getString("c1"); String c2 = after.getString("c2"); - if ( c1.equals("a") ) { + if (c1.equals("a")) { assertThat(c2).isEqualTo("a,b,c"); - } else if ( c1.equals("b") ) { + } else if (c1.equals("b")) { assertThat(c2).isEqualTo("a,b"); - } else if ( c1.equals("c") ) { + } else if (c1.equals("c")) { assertThat(c2).isEqualTo("a"); } else { fail("c1 didn't match expected value"); @@ -325,20 +325,23 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio // Consume all of the events due to startup and initialization of the database // --------------------------------------------------------------------------------------------------------------- // Testing.Debug.enable(); - // 11 schema change records = 5 drop tables, 1 drop database, 1 create database, 1 use database, 5 create tables - SourceRecords records = consumeRecordsByTopic(11 + 6); // plus 6 data records ... + // 12 schema change records = 1 set variables, 5 drop tables, 1 drop database, 1 create database, 1 use database, 5 create + // tables + SourceRecords records = consumeRecordsByTopic(12 + 6); // plus 6 data records ... stopConnector(); assertThat(records).isNotNull(); - assertThat(records.recordsForTopic("regression").size()).isEqualTo(11); + assertThat(records.recordsForTopic("regression").size()).isEqualTo(12); assertThat(records.recordsForTopic("regression.regression_test.t1464075356413_testtable6").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz84_integer_types_table").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_85_fractest").size()).isEqualTo(1); assertThat(records.recordsForTopic("regression.regression_test.dbz_100_enumsettest").size()).isEqualTo(3); assertThat(records.topics().size()).isEqualTo(5); - assertThat(records.databaseNames().size()).isEqualTo(1); + assertThat(records.databaseNames().size()).isEqualTo(2); + assertThat(records.databaseNames()).containsOnly("regression_test",""); assertThat(records.ddlRecordsForDatabase("regression_test").size()).isEqualTo(11); assertThat(records.ddlRecordsForDatabase("connector_test")).isNull(); assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull(); + assertThat(records.ddlRecordsForDatabase("").size()).isEqualTo(1); // SET statement records.ddlRecordsForDatabase("regression_test").forEach(this::print); // Check that all records are valid, can be serialized and deserialized ... @@ -349,11 +352,11 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio Struct after = value.getStruct(Envelope.FieldName.AFTER); String c1 = after.getString("c1"); String c2 = after.getString("c2"); - if ( c1.equals("a") ) { + if (c1.equals("a")) { assertThat(c2).isEqualTo("a,b,c"); - } else if ( c1.equals("b") ) { + } else if (c1.equals("b")) { assertThat(c2).isEqualTo("a,b"); - } else if ( c1.equals("c") ) { + } else if (c1.equals("c")) { assertThat(c2).isEqualTo("a"); } else { fail("c1 didn't match expected value"); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java index df032e962..a56fb6184 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlSchemaTest.java @@ -61,7 +61,10 @@ public void shouldApplyDdlStatementsAndRecover() { mysql.start(); // Testing.Print.enable(); + + // Set up the server ... source.setBinlogStartPoint("binlog-001",400); + mysql.applyDdl(source, "db1", "SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", this::printStatements); mysql.applyDdl(source, "db1", readFile("ddl/mysql-products.ddl"), this::printStatements); // Check that we have tables ... @@ -82,6 +85,7 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeOnlyFilteredDatabases() mysql.start(); source.setBinlogStartPoint("binlog-001",400); + mysql.applyDdl(source, "mysql", "SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", this::printStatements); mysql.applyDdl(source, "mysql", readFile("ddl/mysql-test-init-5.7.ddl"), this::printStatements); source.setBinlogStartPoint("binlog-001",1000); @@ -107,6 +111,7 @@ public void shouldLoadSystemAndNonSystemTablesAndConsumeAllDatabases() { mysql.start(); source.setBinlogStartPoint("binlog-001",400); + mysql.applyDdl(source, "mysql", "SET " + MySqlSystemVariables.CHARSET_NAME_SERVER + "=utf8mb4", this::printStatements); mysql.applyDdl(source, "mysql", readFile("ddl/mysql-test-init-5.7.ddl"), this::printStatements); source.setBinlogStartPoint("binlog-001",1000); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java index d24dbdf41..f1f1e05f2 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/SnapshotReaderIT.java @@ -268,10 +268,10 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep // The last poll should always return null ... assertThat(records).isNull(); - // There should be 11 schema changes ... - assertThat(schemaChanges.recordCount()).isEqualTo(11); - assertThat(schemaChanges.databaseCount()).isEqualTo(1); - assertThat(schemaChanges.databases()).containsOnly(DB_NAME); + // There should be 11 schema changes plus 1 SET statement ... + assertThat(schemaChanges.recordCount()).isEqualTo(12); + assertThat(schemaChanges.databaseCount()).isEqualTo(2); + assertThat(schemaChanges.databases()).containsOnly(DB_NAME,""); // Check the records via the store ... assertThat(store.collectionCount()).isEqualTo(4); diff --git a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java index 3c85d93d4..670508740 100644 --- a/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java +++ b/debezium-core/src/main/java/io/debezium/jdbc/JdbcValueConverters.java @@ -61,7 +61,7 @@ public class JdbcValueConverters implements ValueConverterProvider { private static final Double DOUBLE_TRUE = new Double(1.0d); private static final Double DOUBLE_FALSE = new Double(0.0d); - private final Logger logger = LoggerFactory.getLogger(getClass()); + protected final Logger logger = LoggerFactory.getLogger(getClass()); private final ZoneOffset defaultOffset; private final boolean adaptiveTimePrecision;