DBZ-2913 Support binlog_annotate_row_events

MySQL supports an option called binlog_rows_query_log_events; however, MariaDB
supports a different option called binlog_annotate_row_events. This feature is
to add MariaDB's equivalent so that it acts the same as the original MySQL
option.
This commit is contained in:
Chris Cranford 2023-10-17 12:55:56 -04:00 committed by Jiri Pechanec
parent 3a79832738
commit 2a153cca62
9 changed files with 151 additions and 45 deletions

View File

@ -59,6 +59,9 @@ public class MySqlConnection extends JdbcConnection {
private final MySqlConnectionConfiguration connectionConfig;
private final MySqlFieldReader mysqlFieldReader;
// Tracks whether this connection is with MariaDB, calculated lazily as needed.
private Boolean isMariaDb;
/**
* Creates a new connection using the supplied configuration.
*
@ -426,6 +429,14 @@ public OptionalLong getEstimatedTableSize(TableId tableId) {
return OptionalLong.empty();
}
public boolean isMariaDb() {
if (isMariaDb == null) {
final String version = querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES).get("version");
isMariaDb = version.toLowerCase().contains("mariadb");
}
return isMariaDb;
}
public boolean isTableIdCaseSensitive() {
return !"0".equals(readMySqlSystemVariables().get(MySqlSystemVariables.LOWER_CASE_TABLE_NAMES));
}

View File

@ -755,7 +755,9 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Whether the connector should include the original SQL query that generated the change event. "
+ "Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from snapshot. "
+ "Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. "
+ "If using MariaDB, configure the binlog_annotate_row_events option must be set to ON. "
+ "Query will not be present for events generated from snapshot. "
+ "WARNING: Enabling this option may expose tables or fields explicitly excluded or masked by including the original SQL statement in the change event. "
+ "For this reason the default value is 'false'.")
.withDefault(false);

View File

@ -42,6 +42,7 @@
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.BinaryLogClient.LifecycleListener;
import com.github.shyiko.mysql.binlog.event.AnnotateRowsEventData;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventData;
@ -530,17 +531,25 @@ protected void handleGtidEvent(MySqlOffsetContext offsetContext, Event event) {
}
/**
* Handle the supplied event with an {@link RowsQueryEventData} by recording the original SQL query
* that generated the event.
* Handle the supplied event with an {@link RowsQueryEventData} or {@link AnnotateRowsEventData} by
* recording the original SQL query that generated the event.
*
* @param event the database change data event to be processed; may not be null
*/
protected void handleRowsQuery(MySqlOffsetContext offsetContext, Event event) {
protected void handleRecordingQuery(MySqlOffsetContext offsetContext, Event event) {
final String query;
if (!connection.isMariaDb()) {
// Unwrap the RowsQueryEvent
final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
query = lastRowsQueryEventData.getQuery();
}
else {
// Unwrap the AnnotateRowsEventData
final AnnotateRowsEventData annotateRowsEventData = unwrapData(event);
query = annotateRowsEventData.getRowsQuery();
}
// Set the query on the source
offsetContext.setQuery(lastRowsQueryEventData.getQuery());
offsetContext.setQuery(query);
}
/**
@ -926,7 +935,16 @@ public void execute(ChangeEventSourceContext context, MySqlPartition partition,
// Conditionally register ROWS_QUERY handler to parse SQL statements.
if (connectorConfig.includeSqlQuery()) {
eventHandlers.put(EventType.ROWS_QUERY, (event) -> handleRowsQuery(effectiveOffsetContext, event));
if (!connection.isMariaDb()) {
eventHandlers.put(EventType.ROWS_QUERY, (event) -> handleRecordingQuery(effectiveOffsetContext, event));
}
else {
// Binlog client explicitly needs to be told to enable ANNOTATE_ROWS events, which is the
// MariaDB equivalent of ROWS_QUERY. This must be done ahead of the connection to make
// sure that the right negotiation bits are set during handshake.
client.setUseSendAnnotateRowsEvent(true);
eventHandlers.put(EventType.ANNOTATE_ROWS, (event) -> handleRecordingQuery(effectiveOffsetContext, event));
}
}
BinaryLogClient.EventListener listener;

View File

@ -5,6 +5,10 @@
*/
package io.debezium.connector.mysql;
import java.sql.SQLException;
import io.debezium.jdbc.JdbcConnection;
/**
* A centralized expression of differences in behaviour between MySQL 5.x and 8.x
*
@ -15,4 +19,12 @@ public interface DatabaseDifferences {
boolean isCurrentDateTimeDefaultGenerated();
String currentDateTimeDefaultOptional(String isoString);
default void setBinlogRowQueryEventsOff(JdbcConnection connection) throws SQLException {
connection.execute("SET binlog_rows_query_log_events=OFF");
}
default void setBinlogRowQueryEventsOn(JdbcConnection connection) throws SQLException {
connection.execute("SET binlog_rows_query_log_events=ON");
}
}

View File

@ -232,7 +232,7 @@ public void updates() throws Exception {
final int batchSize = 10;
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.execute("SET binlog_rows_query_log_events=ON");
((MySqlTestConnection) connection).databaseAsserts().setBinlogRowQueryEventsOn(connection);
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(
String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(),

View File

@ -1626,8 +1626,9 @@ public void shouldEmitNoSavepoints() throws Exception {
}
/**
* This test case validates that if you disable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an INSERT statement is NOT parsed into the resulting event.
* This test case validates that if you disable MySQL option binlog_rows_query_log_events or
* the Maria option binlog_annotate_row_events, then the original SQL statement for an
* INSERT statement is NOT parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
@ -1657,7 +1658,7 @@ public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Disable Query log option
connection.execute("SET binlog_rows_query_log_events=OFF");
db.databaseAsserts().setBinlogRowQueryEventsOff(connection);
// Execute insert statement.
connection.execute(insertSqlStatement);
@ -1678,8 +1679,9 @@ public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events,
* but configure the connector to NOT include the query, it will not be included in the event.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events
* or the MariaDB option binlog_annotate_row_events but configure the connector to NOT
* include the query, it will not be included in the event.
*/
@Test
@FixFor("DBZ-706")
@ -1708,7 +1710,7 @@ public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(insertSqlStatement);
@ -1730,8 +1732,9 @@ public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an INSERT statement is parsed into the resulting event.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_row_events, then the original SQL statement for an
* INSERT statement is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
@ -1760,7 +1763,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(insertSqlStatement);
@ -1782,8 +1785,9 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the issue multiple INSERTs, the appropriate SQL statements are parsed into the resulting events.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_rows_event, then the issue multiple INSERTs, the
* appropriate SQL statements are parsed into the resulting events.
*/
@Test
@FixFor("DBZ-706")
@ -1815,7 +1819,7 @@ public void parseMultipleInsertStatements() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(insertSqlStatement1);
@ -1845,8 +1849,9 @@ public void parseMultipleInsertStatements() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the issue single multi-row INSERT, the appropriate SQL statements are parsed into the resulting events.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or the
* MariaDB option binlog_annotate_row_events, then the issue single multi-row INSERT, the
* appropriate SQL statements are parsed into the resulting events.
*/
@Test
@FixFor("DBZ-706")
@ -1877,7 +1882,7 @@ public void parseMultipleRowInsertStatement() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(insertSqlStatement);
@ -1906,8 +1911,9 @@ public void parseMultipleRowInsertStatement() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for a DELETE over a single row is parsed into the resulting event.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_row_events, then the original SQL statement for a
* DELETE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
@ -1936,7 +1942,7 @@ public void parseDeleteQuery() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(deleteSqlStatement);
@ -1957,8 +1963,9 @@ public void parseDeleteQuery() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* issue a multi-row DELETE, the resulting events get the original SQL statement.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_row_events, then issue a multi-row DELETE, the
* resulting events get the original SQL statement.
*/
@Test
@FixFor("DBZ-706")
@ -1987,7 +1994,7 @@ public void parseMultiRowDeleteQuery() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(deleteSqlStatement);
@ -2016,8 +2023,9 @@ public void parseMultiRowDeleteQuery() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an UPDATE over a single row is parsed into the resulting event.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_row_events, then the original SQL statement for an
* UPDATE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
@ -2046,7 +2054,7 @@ public void parseUpdateQuery() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(updateSqlStatement);
@ -2067,8 +2075,9 @@ public void parseUpdateQuery() throws Exception {
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an UPDATE over a single row is parsed into the resulting event.
* This test case validates that if you enable MySQL option binlog_rows_query_log_events or
* the MariaDB option binlog_annotate_row_events, then the original SQL statement for an
* UPDATE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
@ -2097,7 +2106,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
// Execute insert statement.
connection.execute(updateSqlStatement);

View File

@ -27,7 +27,8 @@ public enum MySqlVersion {
MYSQL_5_5,
MYSQL_5_6,
MYSQL_5_7,
MYSQL_8;
MYSQL_8,
MARIADB_11;
}
private DatabaseDifferences databaseAsserts;
@ -105,6 +106,16 @@ public static boolean isPerconaServer() {
return comment.startsWith("Percona");
}
/**
* Check whether the database is MariaDB or MySQL.
*
* @return true if the database is MariaDB; otherwise false
*/
public static boolean isMariaDb() {
String comment = forTestDatabase("mysql").getMySqlVersionComment();
return comment.toLowerCase().contains("mariadb");
}
private static JdbcConfiguration addDefaultSettings(JdbcConfiguration configuration) {
return JdbcConfiguration.adapt(configuration.edit()
.withDefault(JdbcConfiguration.HOSTNAME, "localhost")
@ -129,6 +140,18 @@ public MySqlTestConnection(JdbcConfiguration config) {
public MySqlVersion getMySqlVersion() {
if (mySqlVersion == null) {
final String versionString = getMySqlVersionString();
if (isMariaDb()) {
if (versionString.startsWith("11.")) {
mySqlVersion = MySqlVersion.MARIADB_11;
}
else {
throw new IllegalStateException("Couldn't resolve MariaDB Server version");
}
return mySqlVersion;
}
// Fallback to MySQL
if (versionString.startsWith("8.")) {
mySqlVersion = MySqlVersion.MYSQL_8;
}
@ -193,7 +216,30 @@ public boolean isTableIdCaseSensitive() {
public DatabaseDifferences databaseAsserts() {
if (databaseAsserts == null) {
if (getMySqlVersion() == MySqlVersion.MYSQL_8) {
if (getMySqlVersion() == MySqlVersion.MARIADB_11) {
databaseAsserts = new DatabaseDifferences() {
@Override
public boolean isCurrentDateTimeDefaultGenerated() {
return true;
}
@Override
public String currentDateTimeDefaultOptional(String isoString) {
return null;
}
@Override
public void setBinlogRowQueryEventsOff(JdbcConnection connection) throws SQLException {
connection.execute("SET binlog_annotate_row_events=OFF");
}
@Override
public void setBinlogRowQueryEventsOn(JdbcConnection connection) throws SQLException {
connection.execute("SET binlog_annotate_row_events=ON");
}
};
}
else if (getMySqlVersion() == MySqlVersion.MYSQL_8) {
databaseAsserts = new DatabaseDifferences() {
@Override
public boolean isCurrentDateTimeDefaultGenerated() {

View File

@ -808,7 +808,7 @@ public void columnTypeAndDefaultValueChange() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
connection.execute("alter table DBZ_771_CUSTOMERS change customer_type customer_type int default 42;");
connection.execute("insert into DBZ_771_CUSTOMERS (id) values (2);");
@ -848,7 +848,7 @@ public void columnTypeChangeResetsDefaultValue() throws Exception {
try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
db.databaseAsserts().setBinlogRowQueryEventsOn(connection);
connection.execute("alter table DBZ_771_CUSTOMERS change customer_type customer_type int;");
connection.execute("insert into DBZ_771_CUSTOMERS (id, customer_type) values (2, 456);");

View File

@ -1377,6 +1377,7 @@ a| Mandatory field that describes the source metadata for the event. This field
* Timestamp for when the change was made in the database
If the xref:enable-query-log-events[`binlog_rows_query_log_events`] MySQL configuration option is enabled and the connector configuration `include.query` property is enabled, the `source` field also provides the `query` field, which contains the original SQL statement that caused the change event.
On MariaDB, the configuration option is xref:enable-query-log-events[`binlog_annotate_row_events`].
|===
@ -1455,6 +1456,7 @@ a|Mandatory field that describes the source metadata for the event. The `source`
* Timestamp for when the change was made in the database
If the xref:enable-query-log-events[`binlog_rows_query_log_events`] MySQL configuration option is enabled and the connector configuration `include.query` property is enabled, the `source` field also provides the `query` field, which contains the original SQL statement that caused the change event.
On MariaDB, the configuration option is xref:enable-query-log-events[`binlog_annotate_row_events`].
|4
|`op`
@ -1553,6 +1555,7 @@ a|Mandatory field that describes the source metadata for the event. In a _delete
* Timestamp for when the change was made in the database
If the xref:enable-query-log-events[`binlog_rows_query_log_events`] MySQL configuration option is enabled and the connector configuration `include.query` property is enabled, the `source` field also provides the `query` field, which contains the original SQL statement that caused the change event.
On MariaDB, the configuration option is xref:enable-query-log-events[`binlog_annotate_row_events`].
|4
|`op`
@ -2306,7 +2309,7 @@ a|The number of seconds the server waits for activity on a non-interactive conne
[[enable-query-log-events]]
=== Enabling query log events
You might want to see the original `SQL` statement for each binlog event. Enabling the `binlog_rows_query_log_events` option in the MySQL configuration file allows you to do this.
You might want to see the original `SQL` statement for each binlog event. Enabling the `binlog_rows_query_log_events` option in the MySQL configuration or `binlog_annotate_row_events` in the MariaDB configuration file allows you to do this.
This option is available in MySQL 5.6 and later.
@ -2318,14 +2321,19 @@ This option is available in MySQL 5.6 and later.
.Procedure
* Enable `binlog_rows_query_log_events`:
* Enable `binlog_rows_query_log_events` in MySQL or `binlog_annotate_row_events` in MariaDB:
+
[source,SQL]
----
mysql> binlog_rows_query_log_events=ON
----
+
`binlog_rows_query_log_events` is set to a value that enables/disables support for including the original `SQL` statement in the binlog entry.
[source,SQL]
----
mariadb> binlog_annotate_row_events=ON
----
+
`binlog_rows_query_log_events` or `binlog_annotate_row_events` is set to a value that enables/disables support for including the original `SQL` statement in the binlog entry.
+
** `ON` = enabled
** `OFF` = disabled
@ -2998,7 +3006,7 @@ endif::community[]
|`false`
|Boolean value that specifies whether the connector should include the original SQL query that generated the change event. +
+
If you set this option to `true` then you must also configure MySQL with the `binlog_rows_query_log_events` option set to `ON`. When `include.query` is `true`, the query is not present for events that the snapshot process generates. +
If you set this option to `true` then you must also configure MySQL with the `binlog_rows_query_log_events` option or MariaDB with the `binlog_annotate_row_events` option set to `ON`. When `include.query` is `true`, the query is not present for events that the snapshot process generates. +
+
Setting `include.query` to `true` might expose tables or fields that are explicitly excluded or masked by including the original SQL statement in the change event. For this reason, the default setting is `false`.