DBZ-7830 add test, optimize flush when not enable delete

This commit is contained in:
DLT1412 2024-05-02 10:40:12 +07:00 committed by Chris Cranford
parent d67b28ff62
commit 73df7d114c
2 changed files with 38 additions and 7 deletions

View File

@ -115,18 +115,18 @@ public void execute(Collection<SinkRecord> records) {
if (sinkRecordDescriptor.isDelete()) { if (sinkRecordDescriptor.isDelete()) {
if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) {
// When an delete arrives, update buffer must be flushed to avoid losing an
// delete for the same record after its update.
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());
}
if (!config.isDeleteEnabled()) { if (!config.isDeleteEnabled()) {
LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName()); LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", sinkRecordDescriptor.getTopicName());
continue; continue;
} }
if (updateBufferByTable.get(tableId) != null && !updateBufferByTable.get(tableId).isEmpty()) {
// When an delete arrives, update buffer must be flushed to avoid losing an
// delete for the same record after its update.
flushBuffer(tableId, updateBufferByTable.get(tableId).flush());
}
RecordBuffer tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config)); RecordBuffer tableIdBuffer = deleteBufferByTable.computeIfAbsent(tableId, k -> new RecordBuffer(config));
List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor); List<SinkRecordDescriptor> toFlush = tableIdBuffer.add(sinkRecordDescriptor);

View File

@ -5,6 +5,8 @@
*/ */
package io.debezium.connector.jdbc.integration; package io.debezium.connector.jdbc.integration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkRecord;
@ -242,4 +244,33 @@ public void testShouldHandleTruncateRecord(SinkRecordFactory factory) {
tableAssert.exists().hasNumberOfRows(0).hasNumberOfColumns(3); tableAssert.exists().hasNumberOfRows(0).hasNumberOfColumns(3);
} }
} }
@ParameterizedTest
@ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
@FixFor("DBZ-7830")
public void testShouldFlushUpdateBufferWhenDelete(SinkRecordFactory factory) {
final Map<String, String> properties = getDefaultSinkConfig();
properties.put(JdbcSinkConnectorConfig.SCHEMA_EVOLUTION, SchemaEvolutionMode.BASIC.getValue());
properties.put(JdbcSinkConnectorConfig.PRIMARY_KEY_MODE, PrimaryKeyMode.RECORD_KEY.getValue());
properties.put(JdbcSinkConnectorConfig.DELETE_ENABLED, "true");
properties.put(JdbcSinkConnectorConfig.BATCH_SIZE, "500");
startSinkConnector(properties);
assertSinkConnectorIsRunning();
final String tableName = randomTableName();
final String topicName = topicName("server1", "schema", tableName);
final SinkRecord deleteRecord = factory.deleteRecord(topicName);
List<SinkRecord> records = new ArrayList<SinkRecord>();
records.add(factory.createRecord(topicName, (byte) 2));
records.add(factory.createRecord(topicName, (byte) 1));
records.add(deleteRecord);
// should insert success (not violate primary key constraint)
records.add(factory.createRecord(topicName, (byte) 1));
consume(records);
final TableAssert tableAssert = TestHelper.assertTable(dataSource(), destinationTableName(deleteRecord));
tableAssert.exists().hasNumberOfRows(2).hasNumberOfColumns(3);
}
} }