DBZ-5090 Undo events across multiple mining sessions

This commit is contained in:
Chris Cranford 2022-05-06 08:57:38 -04:00 committed by Chris Cranford
parent 4396fc3ec3
commit 0d595025fd
5 changed files with 183 additions and 15 deletions

View File

@ -59,6 +59,7 @@
public abstract class AbstractLogMinerEventProcessor<T extends AbstractTransaction> implements LogMinerEventProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class);
private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
private final ChangeEventSourceContext context;
private final OracleConnectorConfig connectorConfig;
@ -524,12 +525,16 @@ else if (connectorConfig.getLogMiningUsernameExcludes().contains(transaction.get
*/
protected void handleRollback(LogMinerEventRow row) {
if (getTransactionCache().containsKey(row.getTransactionId())) {
LOGGER.trace("Transaction {} was rolled back.", row.getTransactionId());
finalizeTransactionRollback(row.getTransactionId(), row.getScn());
metrics.setActiveTransactions(getTransactionCache().size());
metrics.incrementRolledBackTransactions();
metrics.addRolledBackTransactionId(row.getTransactionId());
counters.rollbackCount++;
}
else {
LOGGER.trace("Could not rollback transaction {}, was not found in cache.", row.getTransactionId());
}
}
/**
@ -965,6 +970,38 @@ private class ParsedLobWriteSql {
*/
protected abstract Scn getTransactionCacheMinimumScn();
/**
* Returns whether the transaction id has no sequence number component.
*
* Oracle transaction identifiers are a composite of:
* <ol>
* <li>Undo segment number</li>
* <li>Slot numbber of the transaction that generated the change</li>
* <li>Sequence number of the transaction that generated the change</li>
* </ol>
*
* When Oracle LogMiner mines records, it is possible that when an undo operation is detected,
* often the product of a constraint violation, the LogMiner row will have the same explicit
* XID (transaction id) as the source operation that we should undo; however, if the record
* to be undone was mined in a prior iteration, Oracle LogMiner won't be able to make a link
* back to the full transaction's sequence number, therefore the XID value for the undo row
* will contain only the undo segment number and slot number, setting the sequence number to
* 4294967295 (aka -1 or 0xFFFFFFFF).
*
* This method explicitly checks if the provided transaction id has the no sequence sentinel
* value and if so, returns {@code true}; otherwise returns {@code false}.
*
* @param transactionId the transaction identifier to check, should not be {@code null}
* @return true if the transaction has no sequence reference, false if it does
*/
protected boolean isTransactionIdWithNoSequence(String transactionId) {
return transactionId.endsWith(NO_SEQUENCE_TRX_ID_SUFFIX);
}
protected String getTransactionIdPrefix(String transactionId) {
return transactionId.substring(0, 8);
}
/**
* Wrapper for all counter variables
*

View File

@ -90,18 +90,45 @@ protected InfinispanTransaction createTransaction(LogMinerEventRow row) {
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
List<String> eventKeys = getEventCache().keySet()
.stream()
.filter(k -> k.startsWith(row.getTransactionId() + "-"))
.collect(Collectors.toList());
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.trace("Undo applied for event {}.", event);
getEventCache().remove(eventKey);
List<String> eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-");
if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
eventKeys = getTransactionKeysWithPrefix(transactionPrefix);
if (!eventKeys.isEmpty()) {
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.debug("Undo change '{}' applied to transaction '{}'", row, eventKey);
getEventCache().remove(eventKey);
return;
}
}
LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId());
}
else {
LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row);
}
}
else {
for (String eventKey : eventKeys) {
final LogMinerEvent event = getEventCache().get(eventKey);
if (event != null && event.getRowId().equals(row.getRowId())) {
LOGGER.trace("Undo applied for event {}.", event);
getEventCache().remove(eventKey);
return;
}
}
LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId());
}
}
private List<String> getTransactionKeysWithPrefix(String prefix) {
return getEventCache().keySet().stream().filter(k -> k.startsWith(prefix)).collect(Collectors.toList());
}
@Override

View File

@ -92,12 +92,36 @@ protected MemoryTransaction createTransaction(LogMinerEventRow row) {
@Override
protected void removeEventWithRowId(LogMinerEventRow row) {
final MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId());
MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId());
if (transaction == null) {
LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row);
if (isTransactionIdWithNoSequence(row.getTransactionId())) {
// This means that Oracle LogMiner found an event that should be undone but its corresponding
// undo entry was read in a prior mining session and the transaction's sequence could not be
// resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT.
final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId());
LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId());
LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix);
for (String transactionKey : getTransactionCache().keySet()) {
if (transactionKey.startsWith(transactionPrefix)) {
transaction = getTransactionCache().get(transactionKey);
if (transaction != null && transaction.removeEventWithRowId(row.getRowId())) {
// We successfully found a transaction with the same XISUSN and XIDSLT and that
// transaction included a change for the specified row id.
LOGGER.debug("Undo change '{}' applied to transaction '{}'", row, transactionKey);
return;
}
}
}
LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId());
}
else {
LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row);
}
}
else {
transaction.removeEventWithRowId(row.getRowId());
if (!transaction.removeEventWithRowId(row.getRowId())) {
LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId());
}
}
}

View File

@ -53,8 +53,8 @@ public List<LogMinerEvent> getEvents() {
return events;
}
public void removeEventWithRowId(String rowId) {
events.removeIf(event -> {
public boolean removeEventWithRowId(String rowId) {
return events.removeIf(event -> {
if (event.getRowId().equals(rowId)) {
LOGGER.trace("Undo applied for event {}.", event);
return true;

View File

@ -29,6 +29,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -2199,6 +2204,81 @@ public void shouldNotEmitEventsOnConstraintViolations() throws Exception {
}
}
@Test
@FixFor("DBZ-5090")
public void shouldNotEmitEventsOnConstraintViolationsAcrossSessions() throws Exception {
TestHelper.dropTable(connection, "dbz5090");
try {
connection.execute("CREATE TABLE dbz5090 (id number(9,0), data varchar2(50))");
connection.execute("CREATE UNIQUE INDEX uk_dbz5090 ON dbz5090 (id)");
TestHelper.streamTable(connection, "dbz5090");
Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5090")
.build();
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
// We require the use of an executor here so that the multiple threads cooperate with one
// another in a way that does not block the test moving forward in the various stages.
ExecutorService executorService = Executors.newFixedThreadPool(1);
try (OracleConnection connection2 = TestHelper.testConnection()) {
connection.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (1,'Test1')");
// Task that creates in-progress transaction with second connection
final CountDownLatch latch = new CountDownLatch(1);
final Callable<Boolean> task = () -> {
try {
connection2.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (2,'Test2')");
latch.countDown();
connection2.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (1,'Test2')");
return true;
}
catch (SQLException e) {
return false;
}
};
// Submit the blocking task on the executor service
Future<Boolean> future = executorService.submit(task);
// We wait until the latch has been triggered by the callable task
latch.await();
// Explicitly wait 5 seconds to guarantee that the thread has executed the SQL
Thread.sleep(5000);
connection.commit();
// Get the secondary thread blocking state, should return false due to constraint violation
assertThat(future.get()).isFalse();
connection2.commit();
}
final SourceRecords sourceRecords = consumeRecordsByTopic(2);
List<SourceRecord> records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5090");
assertThat(records).hasSize(2);
VerifyRecord.isValidInsert(records.get(0), "ID", 1);
final Struct after = (((Struct) records.get(0).value()).getStruct("after"));
assertThat(after.get("ID")).isEqualTo(1);
assertThat(after.get("DATA")).isEqualTo("Test1");
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz5090");
}
}
@Test
@FixFor("DBZ-3322")
public void shouldNotEmitEventsInRollbackTransaction() throws Exception {