diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java index bd00a2161..4353d1566 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlReadOnlyIncrementalSnapshotChangeEventSource.java @@ -218,6 +218,15 @@ protected void emitWindowClose() throws InterruptedException { } } + @Override + protected void sendEvent(Partition partition, EventDispatcher dispatcher, OffsetContext offsetContext, Object[] row) throws InterruptedException { + SourceInfo sourceInfo = ((MySqlOffsetContext) offsetContext).getSource(); + String query = sourceInfo.getQuery(); + sourceInfo.setQuery(null); + super.sendEvent(partition, dispatcher, offsetContext, row); + sourceInfo.setQuery(query); + } + public void rereadChunk() throws InterruptedException { if (context == null) { return; diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java index 2155f8776..ad6f7e130 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/IncrementalSnapshotIT.java @@ -6,10 +6,18 @@ package io.debezium.connector.mysql; -import java.sql.SQLException; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import java.sql.SQLException; +import java.util.Map; + +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.After; import org.junit.Before; +import org.junit.Test; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; @@ -43,6 +51,7 @@ public void after() { protected Configuration.Builder config() { return DATABASE.defaultConfig() + .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) .with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue()) @@ -121,4 +130,44 @@ protected void executeRenameTable(JdbcConnection connection, String newTable) th protected String createTableStatement(String newTable, String copyTable) { return String.format("CREATE TABLE %s LIKE %s", newTable, copyTable); } + + @Test + public void updates() throws Exception { + // Testing.Print.enable(); + + populateTable(); + startConnector(); + + sendAdHocSnapshotSignal(); + + final int batchSize = 10; + try (JdbcConnection connection = databaseConnection()) { + connection.setAutoCommit(false); + connection.execute("SET binlog_rows_query_log_events=ON"); + for (int i = 0; i < ROW_COUNT; i++) { + connection.executeWithoutCommitting( + String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(), + i * batchSize, (i + 1) * batchSize)); + connection.commit(); + } + } + + final int expectedRecordCount = ROW_COUNT; + final Map dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedRecordCount, + x -> ((Struct) x.getValue().value()).getStruct("after").getInt32(valueFieldName()) >= 2000, null); + for (int i = 0; i < expectedRecordCount; i++) { + SourceRecord record = dbChanges.get(i + 1); + final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName()); + assertEquals(i + 2000, value); + Object query = ((Struct) record.value()).getStruct("source").get("query"); + String snapshot = ((Struct) record.value()).getStruct("source").get("snapshot").toString(); + if (snapshot.equals("false")) { + assertNotNull(query); + } + else { + assertNull(query); + assertEquals("incremental", snapshot); + } + } + } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java index 8a093feb4..b85a4e52f 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ReadOnlyIncrementalSnapshotIT.java @@ -78,6 +78,7 @@ protected Configuration.Builder config() { .with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true) .with(KafkaSignalThread.SIGNAL_TOPIC, getSignalsTopic()) .with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList()) + .with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4")); } diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotWithSchemaChangesSupportTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotWithSchemaChangesSupportTest.java index 2dda7fd99..10f506016 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotWithSchemaChangesSupportTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/source/snapshot/incremental/AbstractIncrementalSnapshotWithSchemaChangesSupportTest.java @@ -46,7 +46,7 @@ public abstract class AbstractIncrementalSnapshotWithSchemaChangesSupportTest