DBZ-706 Adding option for showing source query in sourceInfo element

This commit is contained in:
Stephen Powis 2018-05-29 14:36:28 +09:00 committed by Gunnar Morling
parent d556fca852
commit cc33fed16a
12 changed files with 585 additions and 11 deletions

View File

@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import com.github.shyiko.mysql.binlog.event.RowsQueryEventData;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
@ -265,6 +266,11 @@ protected void doStart() {
eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction);
eventHandlers.put(EventType.XID, this::handleTransactionCompletion);
// Conditionally register QUERY handler to parse SQL statements.
if (context.includeSqlQuery()) {
eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery);
}
// Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint.
String availableServerGtidStr = connectionContext.knownGtidSet();
if (availableServerGtidStr != null && !availableServerGtidStr.trim().isEmpty()) {
@ -573,6 +579,22 @@ protected void handleGtidEvent(Event event) {
}
}
/**
* Handle the supplied event with an {@link RowsQueryEventData} by possibly recording the original SQL query
* that generated the event.
*
* @param event the database change data event to be processed; may not be null
*/
protected void handleRowsQuery(Event event) {
// Unwrap the RowsQueryEvent
final RowsQueryEventData lastRowsQueryEventData = unwrapData(event);
// TODO potentially filter or skip processing this event based on connector configuration options or filtering rules.
// Set the query on the source
source.setQuery(lastRowsQueryEventData.getQuery());
}
/**
* Handle the supplied event with an {@link QueryEventData} by possibly recording the DDL statements as changes in the
* MySQL schemas.

View File

@ -777,6 +777,17 @@ public static EventProcessingFailureHandlingMode parse(String value) {
+ "The default is 'true'. This is independent of how the connector internally records database history.")
.withDefault(true);
public static final Field INCLUDE_SQL_QUERY = Field.create("include.query")
.withDisplayName("Include original SQL query with in change events")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withImportance(Importance.MEDIUM)
.withDescription("Whether the connector should include the original SQL query that generated the change event. "
+ "Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. "
+ "WARNING: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. "
+ "For this reason the default value is 'false'.")
.withDefault(false);
public static final Field SNAPSHOT_MODE = Field.create("snapshot.mode")
.withDisplayName("Snapshot mode")
.withEnum(SnapshotMode.class, SnapshotMode.INITIAL)
@ -924,7 +935,7 @@ public static final Field MASK_COLUMN(int length) {
CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.POLL_INTERVAL_MS,
BUFFER_SIZE_FOR_BINLOG_READER, Heartbeat.HEARTBEAT_INTERVAL,
Heartbeat.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES,
Heartbeat.HEARTBEAT_TOPICS_PREFIX, DATABASE_HISTORY, INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY,
TABLE_WHITELIST, TABLE_BLACKLIST, TABLES_IGNORE_BUILTIN,
DATABASE_WHITELIST, DATABASE_BLACKLIST,
COLUMN_BLACKLIST, SNAPSHOT_MODE, SNAPSHOT_MINIMAL_LOCKING, SNAPSHOT_LOCKING_MODE,
@ -984,7 +995,7 @@ protected static ConfigDef configDef() {
KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, DATABASE_HISTORY,
DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, DatabaseHistory.DDL_FILTER,
DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL);
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
Field.group(config, "Events", INCLUDE_SCHEMA_CHANGES, INCLUDE_SQL_QUERY, TABLES_IGNORE_BUILTIN, DATABASE_WHITELIST, TABLE_WHITELIST,
COLUMN_BLACKLIST, TABLE_BLACKLIST, DATABASE_BLACKLIST,
GTID_SOURCE_INCLUDES, GTID_SOURCE_EXCLUDES, GTID_SOURCE_FILTER_DML_EVENTS, BUFFER_SIZE_FOR_BINLOG_READER,
Heartbeat.HEARTBEAT_INTERVAL, Heartbeat.HEARTBEAT_TOPICS_PREFIX, EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE, INCONSISTENT_SCHEMA_HANDLING_MODE,

View File

@ -216,6 +216,10 @@ public boolean includeSchemaChangeRecords() {
return config.getBoolean(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES);
}
public boolean includeSqlQuery() {
return config.getBoolean(MySqlConnectorConfig.INCLUDE_SQL_QUERY);
}
public boolean isSnapshotAllowedWhenNeeded() {
return snapshotMode() == SnapshotMode.WHEN_NEEDED;
}

View File

@ -110,6 +110,7 @@ final class SourceInfo extends AbstractSourceInfo {
public static final String THREAD_KEY = "thread";
public static final String DB_NAME_KEY = "db";
public static final String TABLE_NAME_KEY = "table";
public static final String QUERY_KEY = "query";
/**
* A {@link Schema} definition for a {@link Struct} used to store the {@link #partition()} and {@link #offset()} information.
@ -127,6 +128,8 @@ final class SourceInfo extends AbstractSourceInfo {
.field(THREAD_KEY, Schema.OPTIONAL_INT64_SCHEMA)
.field(DB_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.field(TABLE_NAME_KEY, Schema.OPTIONAL_STRING_SCHEMA)
// TODO how to make this optional?
.field(QUERY_KEY, Schema.OPTIONAL_STRING_SCHEMA)
.build();
private String currentGtidSet;
@ -148,6 +151,7 @@ final class SourceInfo extends AbstractSourceInfo {
private Map<String, String> sourcePartition;
private boolean lastSnapshot = true;
private boolean nextSnapshot = false;
private String currentQuery = null;
public SourceInfo() {
super(Module.version());
@ -163,6 +167,22 @@ public void setServerName(String logicalId) {
sourcePartition = Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
}
/**
* Set the original SQL query.
*
* @param query the original SQL query that generated the event.
*/
public void setQuery(final String query) {
this.currentQuery = query;
}
/**
* @return the original SQL query that generated the event. NULL if no such query is associated.
*/
public String getQuery() {
return this.currentQuery;
}
/**
* Get the Kafka Connect detail about the source "partition", which describes the portion of the source that we are
* consuming. Since we're reading the binary log for a single database, the source partition specifies the
@ -325,6 +345,9 @@ public Struct struct(TableId tableId) {
result.put(DB_NAME_KEY, tableId.catalog());
result.put(TABLE_NAME_KEY, tableId.table());
}
if (currentQuery != null) {
result.put(QUERY_KEY, currentQuery);
}
return result;
}
@ -371,6 +394,7 @@ public void commitTransaction() {
this.restartRowsToSkip = 0;
this.restartEventsToSkip = 0;
this.inTransaction = false;
this.currentQuery = null;
}
/**

View File

@ -43,6 +43,7 @@ log_bin = mysql-bin
expire_logs_days = 3
binlog_format = row
# ----------------------------------------------
# Enable GTIDs on this master
# ----------------------------------------------

View File

@ -37,6 +37,7 @@ gtid_mode = on
enforce_gtid_consistency = on
log_slave_updates = on
# ----------------------------------------------
# Debezium ingest
# ----------------------------------------------

View File

@ -44,3 +44,4 @@ expire_logs_days = 1
binlog_format = row

View File

@ -109,7 +109,8 @@ protected Configuration.Builder simpleConfig() {
return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.USER, "replicator")
.with(MySqlConnectorConfig.PASSWORD, "replpass")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false);
}
@Test

View File

@ -17,7 +17,7 @@ public class ConnectionIT implements Testing {
@Ignore
@Test
public void shouldConnectToDefaulDatabase() throws SQLException {
public void shouldConnectToDefaultDatabase() throws SQLException {
try (MySQLConnection conn = MySQLConnection.forTestDatabase("mysql");) {
conn.connect();
}

View File

@ -51,6 +51,9 @@ public class MySqlConnectorIT extends AbstractConnectorTest {
private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", DATABASE)
.withDbHistoryPath(DB_HISTORY_PATH);
// Defines how many initial events are generated from loading the test databases.
private static final int INITIAL_EVENT_COUNT = 9 + 9 + 4 + 5 + 6;
private Configuration config;
@Before
@ -931,7 +934,7 @@ public void shouldConsumeEventsWithNoSnapshot() throws SQLException, Interrupted
// Consume the first records due to startup and initialization of the database ...
// Testing.Print.enable();
SourceRecords records = consumeRecordsByTopic(9 + 9 + 4 + 5 + 6); // 6 DDL changes
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
assertThat(records.recordsForTopic(RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
@ -1027,7 +1030,7 @@ public void shouldEmitTombstoneOnDeleteByDefault() throws Exception {
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(9 + 9 + 4 + 5 + 6); // 6 DDL changes
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(records.recordsForTopic(DATABASE.topicForTable("orders")).size()).isEqualTo(5);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
@ -1061,9 +1064,9 @@ public void shouldEmitTombstoneOnDeleteByDefault() throws Exception {
@FixFor("DBZ-582")
public void shouldEmitNoTombstoneOnDelete() throws Exception {
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.build();
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NEVER)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
@ -1071,7 +1074,7 @@ public void shouldEmitNoTombstoneOnDelete() throws Exception {
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(9 + 9 + 4 + 5 + 6); // 6 DDL changes
SourceRecords records = consumeRecordsByTopic(INITIAL_EVENT_COUNT); // 6 DDL changes
assertThat(records.recordsForTopic(DATABASE.topicForTable("orders")).size()).isEqualTo(5);
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName());) {
@ -1102,8 +1105,508 @@ public void shouldEmitNoTombstoneOnDelete() throws Exception {
stopConnector();
}
/**
* This test case validates that if you disable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an INSERT statement is NOT parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Disable Query log option
connection.execute("SET binlog_rows_query_log_events=OFF");
// Execute insert statement.
connection.execute(insertSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(1);
// Parse through the source record for the query value.
final SourceRecord sourceRecord = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
logger.info("Record: {}", sourceRecord);
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertQuery(sourceRecord, null);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events,
* but configure the connector to NOT include the query, it will not be included in the event.
*/
@Test
@FixFor("DBZ-706")
public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector to NOT parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(insertSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(1);
// Parse through the source record for the query value.
final SourceRecord sourceRecord = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
logger.info("Record: {}", sourceRecord);
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertQuery(sourceRecord, null);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an INSERT statement is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(insertSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(1);
// Parse through the source record for the query value.
final SourceRecord sourceRecord = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
logger.info("Record: {}", sourceRecord);
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertQuery(sourceRecord, insertSqlStatement);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the issue multiple INSERTs, the appropriate SQL statements are parsed into the resulting events.
*/
@Test
@FixFor("DBZ-706")
public void parseMultipleInsertStatements() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String insertSqlStatement1 = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
final String insertSqlStatement2 = "INSERT INTO products VALUES (default,'toaster','Toaster',3.33)";
logger.warn(DATABASE.getDatabaseName());
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(insertSqlStatement1);
connection.execute(insertSqlStatement2);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(2);
// Parse through the source record for the query value.
final SourceRecord sourceRecord1 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertQuery(sourceRecord1, insertSqlStatement1);
// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertQuery(sourceRecord2, insertSqlStatement2);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the issue single multi-row INSERT, the appropriate SQL statements are parsed into the resulting events.
*/
@Test
@FixFor("DBZ-706")
public void parseMultipleRowInsertStatement() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)";
logger.warn(DATABASE.getDatabaseName());
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(insertSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(2);
// Parse through the source record for the query value.
final SourceRecord sourceRecord1 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertQuery(sourceRecord1, insertSqlStatement);
// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertQuery(sourceRecord2, insertSqlStatement);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for a DELETE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
public void parseDeleteQuery() throws Exception {
// Define the table we want to watch events from.
final String tableName = "orders";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String deleteSqlStatement = "DELETE FROM orders WHERE order_number=10001 LIMIT 1";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(deleteSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(1);
// Parse through the source record for the query value.
final SourceRecord sourceRecord = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been a delete with query parsed.
validate(sourceRecord);
assertDelete(sourceRecord, "order_number", 10001);
assertQuery(sourceRecord, deleteSqlStatement);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* issue a multi-row DELETE, the resulting events get the original SQL statement.
*/
@Test
@FixFor("DBZ-706")
public void parseMultiRowDeleteQuery() throws Exception {
// Define the table we want to watch events from.
final String tableName = "orders";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String deleteSqlStatement = "DELETE FROM orders WHERE purchaser=1002";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(deleteSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(2);
// Parse through the source record for the query value.
final SourceRecord sourceRecord1 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertDelete(sourceRecord1, "order_number", 10002);
assertQuery(sourceRecord1, deleteSqlStatement);
// Validate second event.
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
// Should have been a delete with query parsed.
validate(sourceRecord2);
assertDelete(sourceRecord2, "order_number", 10004);
assertQuery(sourceRecord2, deleteSqlStatement);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an UPDATE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
public void parseUpdateQuery() throws Exception {
// Define the table we want to watch events from.
final String tableName = "products";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String updateSqlStatement = "UPDATE products set name='toaster' where id=109 LIMIT 1";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(updateSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(1);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(1);
// Parse through the source record for the query value.
final SourceRecord sourceRecord = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been a delete with query parsed.
validate(sourceRecord);
assertUpdate(sourceRecord, "id", 109);
assertQuery(sourceRecord, updateSqlStatement);
}
/**
* This test case validates that if you enable MySQL option binlog_rows_query_log_events, then
* the original SQL statement for an UPDATE over a single row is parsed into the resulting event.
*/
@Test
@FixFor("DBZ-706")
public void parseMultiRowUpdateQuery() throws Exception {
// Define the table we want to watch events from.
final String tableName = "orders";
config = DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)
.with(MySqlConnectorConfig.TABLE_WHITELIST, DATABASE.qualifiedTableName(tableName))
// Explicitly configure connector TO parse query
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.build();
// Start the connector ...
start(MySqlConnector.class, config);
// Flush all existing records not related to the test.
consumeRecords(INITIAL_EVENT_COUNT, null);
// Define insert query we want to validate.
final String updateSqlStatement = "UPDATE orders set quantity=0 where order_number in (10001, 10004)";
// Connect to the DB and issue our insert statement to test.
try (MySQLConnection db = MySQLConnection.forTestDatabase(DATABASE.getDatabaseName())) {
try (JdbcConnection connection = db.connect()) {
// Enable Query log option
connection.execute("SET binlog_rows_query_log_events=ON");
// Execute insert statement.
connection.execute(updateSqlStatement);
}
}
// Lets see what gets produced?
final SourceRecords records = consumeRecordsByTopic(2);
assertThat(records.recordsForTopic(DATABASE.topicForTable(tableName)).size()).isEqualTo(2);
// Parse through the source record for the query value.
final SourceRecord sourceRecord1 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(0);
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertUpdate(sourceRecord1, "order_number", 10001);
assertQuery(sourceRecord1, updateSqlStatement);
// Validate second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);
// Should have been a delete with query parsed.
validate(sourceRecord2);
assertUpdate(sourceRecord2, "order_number", 10004);
assertQuery(sourceRecord2, updateSqlStatement);
}
private List<SourceRecord> recordsForTopicForRoProductsTable(SourceRecords records) {
final List<SourceRecord> uc = records.recordsForTopic(RO_DATABASE.topicForTable("Products"));
return uc != null ? uc : records.recordsForTopic(RO_DATABASE.topicForTable("products"));
}
}
}

View File

@ -41,6 +41,7 @@ public void shouldCreateTaskFromConfiguration() throws Exception {
assertThat(context.serverName()).isEqualTo(serverName);
assertThat("" + context.includeSchemaChangeRecords()).isEqualTo(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES.defaultValueAsString());
assertThat("" + context.includeSqlQuery()).isEqualTo(MySqlConnectorConfig.INCLUDE_SQL_QUERY.defaultValueAsString());
assertThat("" + context.getConnectorConfig().getMaxBatchSize()).isEqualTo(MySqlConnectorConfig.MAX_BATCH_SIZE.defaultValueAsString());
assertThat("" + context.getConnectorConfig().getMaxQueueSize()).isEqualTo(MySqlConnectorConfig.MAX_QUEUE_SIZE.defaultValueAsString());
assertThat("" + context.getConnectorConfig().getPollInterval().toMillis()).isEqualTo(MySqlConnectorConfig.POLL_INTERVAL_MS.defaultValueAsString());

View File

@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.data.Field;
@ -543,6 +544,10 @@ protected void assertDelete(SourceRecord record, String pkField, int pk) {
VerifyRecord.isValidDelete(record, pkField, pk);
}
protected void assertQuery(SourceRecord record, String query) {
assertValueField(record, "source/query", query);
}
protected void assertTombstone(SourceRecord record, String pkField, int pk) {
VerifyRecord.isValidTombstone(record, pkField, pk);
}