DBZ-4501 Null out incremental snapshot event's query field

This commit is contained in:
Kate 2021-12-30 21:16:04 -05:00 committed by Jiri Pechanec
parent edcd133ec0
commit 5d81466ec1
4 changed files with 64 additions and 5 deletions

View File

@ -218,6 +218,15 @@ protected void emitWindowClose() throws InterruptedException {
} }
} }
@Override
protected void sendEvent(Partition partition, EventDispatcher<T> dispatcher, OffsetContext offsetContext, Object[] row) throws InterruptedException {
SourceInfo sourceInfo = ((MySqlOffsetContext) offsetContext).getSource();
String query = sourceInfo.getQuery();
sourceInfo.setQuery(null);
super.sendEvent(partition, dispatcher, offsetContext, row);
sourceInfo.setQuery(query);
}
public void rereadChunk() throws InterruptedException { public void rereadChunk() throws InterruptedException {
if (context == null) { if (context == null) {
return; return;

View File

@ -6,10 +6,18 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.sql.SQLException; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.sql.SQLException;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode; import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
@ -43,6 +51,7 @@ public void after() {
protected Configuration.Builder config() { protected Configuration.Builder config() {
return DATABASE.defaultConfig() return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue()) .with(MySqlConnectorConfig.SNAPSHOT_MODE, SnapshotMode.SCHEMA_ONLY.getValue())
@ -121,4 +130,44 @@ protected void executeRenameTable(JdbcConnection connection, String newTable) th
protected String createTableStatement(String newTable, String copyTable) { protected String createTableStatement(String newTable, String copyTable) {
return String.format("CREATE TABLE %s LIKE %s", newTable, copyTable); return String.format("CREATE TABLE %s LIKE %s", newTable, copyTable);
} }
@Test
public void updates() throws Exception {
// Testing.Print.enable();
populateTable();
startConnector();
sendAdHocSnapshotSignal();
final int batchSize = 10;
try (JdbcConnection connection = databaseConnection()) {
connection.setAutoCommit(false);
connection.execute("SET binlog_rows_query_log_events=ON");
for (int i = 0; i < ROW_COUNT; i++) {
connection.executeWithoutCommitting(
String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", tableName(),
i * batchSize, (i + 1) * batchSize));
connection.commit();
}
}
final int expectedRecordCount = ROW_COUNT;
final Map<Integer, SourceRecord> dbChanges = consumeRecordsMixedWithIncrementalSnapshot(expectedRecordCount,
x -> ((Struct) x.getValue().value()).getStruct("after").getInt32(valueFieldName()) >= 2000, null);
for (int i = 0; i < expectedRecordCount; i++) {
SourceRecord record = dbChanges.get(i + 1);
final int value = ((Struct) record.value()).getStruct("after").getInt32(valueFieldName());
assertEquals(i + 2000, value);
Object query = ((Struct) record.value()).getStruct("source").get("query");
String snapshot = ((Struct) record.value()).getStruct("source").get("snapshot").toString();
if (snapshot.equals("false")) {
assertNotNull(query);
}
else {
assertNull(query);
assertEquals("incremental", snapshot);
}
}
}
} }

View File

@ -78,6 +78,7 @@ protected Configuration.Builder config() {
.with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true) .with(MySqlConnectorConfig.READ_ONLY_CONNECTION, true)
.with(KafkaSignalThread.SIGNAL_TOPIC, getSignalsTopic()) .with(KafkaSignalThread.SIGNAL_TOPIC, getSignalsTopic())
.with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList()) .with(KafkaSignalThread.BOOTSTRAP_SERVERS, kafka.brokerList())
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4")); .with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, String.format("%s:%s", DATABASE.qualifiedTableName("a42"), "pk1,pk2,pk3,pk4"));
} }

View File

@ -46,7 +46,7 @@ public abstract class AbstractIncrementalSnapshotWithSchemaChangesSupportTest<T
@Test @Test
public void schemaChanges() throws Exception { public void schemaChanges() throws Exception {
Print.enable(); // Testing.Print.enable();
populateTable(); populateTable();
startConnector(); startConnector();
@ -87,7 +87,7 @@ public void schemaChanges() throws Exception {
@Test @Test
public void renameTable() throws Exception { public void renameTable() throws Exception {
Print.enable(); // Testing.Print.enable();
populateTable(); populateTable();
final String newTable = "new_table"; final String newTable = "new_table";
@ -141,7 +141,7 @@ public void renameTable() throws Exception {
@Test @Test
public void columnNullabilityChanges() throws Exception { public void columnNullabilityChanges() throws Exception {
Print.enable(); // Testing.Print.enable();
populateTable(); populateTable();
final Configuration config = config().build(); final Configuration config = config().build();
@ -186,7 +186,7 @@ public void columnNullabilityChanges() throws Exception {
@Test @Test
public void columnDefaultChanges() throws Exception { public void columnDefaultChanges() throws Exception {
Print.enable(); // Testing.Print.enable();
populateTable(); populateTable();
final Configuration config = config().build(); final Configuration config = config().build();