DBZ-6963 Do not merge events if table has no LOB columns

There was a possible situation where if a long transaction consisted of
updating and inserting into the same table with identical keys with a
given sequence that the commit handler would merge several events for a
table without LOB columns, resulting in a difference in expected events
in the Kafka topic vs what was seen in LogMiner.
This commit is contained in:
Chris Cranford 2023-10-26 02:19:14 -04:00 committed by Jiri Pechanec
parent 468b6e431e
commit b69dbd53e6
2 changed files with 149 additions and 0 deletions

View File

@ -142,6 +142,22 @@ private void acceptDmlEvent(DmlEvent event) throws InterruptedException {
String rowId = rowIdFromEvent(table, event); String rowId = rowIdFromEvent(table, event);
RowState rowState = rows.get(rowId); RowState rowState = rows.get(rowId);
DmlEvent accumulatorEvent = (null == rowState) ? null : rowState.event; DmlEvent accumulatorEvent = (null == rowState) ? null : rowState.event;
// DBZ-6963
// This short-circuits the commit consumer's accumulation logic by assessing whether the
// table has any LOB columns (clob, blob, or xml). If the table does not, then there is
// no need to perform any of these steps as it should never be eligible for merging.
final List<Column> lobColumns = schema.getLobColumnsForTable(table.id());
if (lobColumns.isEmpty()) {
// There should never be a use case where the accumulator event is not null in this code
// path because given that the table has no LOB columns, it won't ever be added to the
// queue with the logic below. Therefore, there is no need to attempt to dispatch the
// accumulator as it should be null.
LOGGER.debug("\tEvent for table {} has no LOB columns, dispatching.", table.id());
dispatchChangeEvent(event);
return;
}
if (!tryMerge(accumulatorEvent, event)) { if (!tryMerge(accumulatorEvent, event)) {
prepareAndDispatch(accumulatorEvent); prepareAndDispatch(accumulatorEvent);
if (rowId.equals(currentLobDetails.rowId)) { if (rowId.equals(currentLobDetails.rowId)) {

View File

@ -0,0 +1,133 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.oracle.logminer;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Chris Cranford
*/
public class TransactionCommitConsumerIT extends AbstractConnectorTest {
private static OracleConnection connection;
@BeforeClass
public static void beforeClass() throws SQLException {
connection = TestHelper.testConnection();
TestHelper.dropAllTables();
}
@AfterClass
public static void afterClass() throws SQLException {
if (connection != null) {
connection.close();
}
}
@Test
@FixFor("DBZ-6963")
public void testShouldNotConsolidateEventsWhenTableHasNoLobColumns() throws Exception {
try {
connection.execute("CREATE TABLE addresses (ID numeric(9,0) primary key, person_id numeric(9,0))");
connection.execute("CREATE TABLE email (ID numeric(9,0) primary key, person_id numeric(9,0))");
connection.execute("CREATE TABLE phone (ID numeric(9,0) primary key, person_id numeric(9,0))");
// Seed data
connection.execute("INSERT INTO addresses values (-1,-1)");
connection.execute("INSERT INTO email values (-1,-1)");
connection.execute("INSERT INTO phone values (-1,-1)");
TestHelper.streamTable(connection, "addresses");
TestHelper.streamTable(connection, "email");
TestHelper.streamTable(connection, "phone");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.ADDRESSES,DEBEZIUM\\.EMAIL,DEBEZIUM\\.PHONE")
.with(OracleConnectorConfig.LOB_ENABLED, "true")
.with(OracleConnectorConfig.SNAPSHOT_MODE, "schema_only")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// Perform several iterations to create a large transaction with specific sequences of changes against
// tables that will be viewed by the TransactionCommitConsumer as eligible for merging since LOB is
// enabled but that shouldn't be because the tables have no LOB columns.
connection.setAutoCommit(false);
final int ITERATIONS = 25;
for (int i = 0; i < ITERATIONS; i++) {
connection.executeWithoutCommitting("INSERT INTO addresses (ID,PERSON_ID) values (" + i + ",-1)");
connection.executeWithoutCommitting("UPDATE email SET person_id = " + i + " WHERE id = -1");
connection.executeWithoutCommitting("INSERT INTO email (ID,PERSON_ID) values (" + i + ",-1)");
connection.executeWithoutCommitting("UPDATE email SET person_id = " + (i + 999) + " WHERE id = " + i);
connection.executeWithoutCommitting("UPDATE email SET person_id = " + (i + 1000) + " WHERE id = -1");
connection.executeWithoutCommitting("UPDATE phone SET person_id = " + i + " WHERE id = -1");
connection.executeWithoutCommitting("INSERT INTO phone (ID,PERSON_ID) values (" + i + ",-1)");
connection.executeWithoutCommitting("UPDATE phone SET person_id = " + i + " WHERE id = " + i);
connection.executeWithoutCommitting("UPDATE phone SET person_id = -1 WHERE id = -1");
connection.executeWithoutCommitting("UPDATE addresses SET person_id = " + i + " WHERE id = -1");
}
connection.commit();
SourceRecords records = consumeRecordsByTopic(ITERATIONS * 10);
final List<SourceRecord> addresses = records.recordsForTopic("server1.DEBEZIUM.ADDRESSES");
assertThat(addresses).hasSize(2 * ITERATIONS);
for (int i = 0, k = 0; i < addresses.size(); i += 2, k++) {
VerifyRecord.isValidInsert(addresses.get(i), "ID", k);
VerifyRecord.isValidUpdate(addresses.get(i + 1), "ID", -1);
}
final List<SourceRecord> phones = records.recordsForTopic("server1.DEBEZIUM.PHONE");
assertThat(phones).hasSize(4 * ITERATIONS);
for (int i = 0, k = 0; i < phones.size(); i += 4, k++) {
VerifyRecord.isValidUpdate(phones.get(i), "ID", -1);
VerifyRecord.isValidInsert(phones.get(i + 1), "ID", k);
VerifyRecord.isValidUpdate(phones.get(i + 2), "ID", k);
VerifyRecord.isValidUpdate(phones.get(i + 3), "ID", -1);
}
final List<SourceRecord> emails = records.recordsForTopic("server1.DEBEZIUM.EMAIL");
assertThat(emails).hasSize(4 * ITERATIONS);
for (int i = 0, k = 0; i < emails.size(); i += 4, k++) {
VerifyRecord.isValidUpdate(emails.get(i), "ID", -1);
VerifyRecord.isValidInsert(emails.get(i + 1), "ID", k);
VerifyRecord.isValidUpdate(emails.get(i + 2), "ID", 0);
VerifyRecord.isValidUpdate(emails.get(i + 3), "ID", -1);
}
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "phone");
TestHelper.dropTable(connection, "email");
TestHelper.dropTable(connection, "addresses");
}
}
}