diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 8dc41a516..3f6205aad 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -59,6 +59,7 @@ public abstract class AbstractLogMinerEventProcessor implements LogMinerEventProcessor { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLogMinerEventProcessor.class); + private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff"; private final ChangeEventSourceContext context; private final OracleConnectorConfig connectorConfig; @@ -524,12 +525,16 @@ else if (connectorConfig.getLogMiningUsernameExcludes().contains(transaction.get */ protected void handleRollback(LogMinerEventRow row) { if (getTransactionCache().containsKey(row.getTransactionId())) { + LOGGER.trace("Transaction {} was rolled back.", row.getTransactionId()); finalizeTransactionRollback(row.getTransactionId(), row.getScn()); metrics.setActiveTransactions(getTransactionCache().size()); metrics.incrementRolledBackTransactions(); metrics.addRolledBackTransactionId(row.getTransactionId()); counters.rollbackCount++; } + else { + LOGGER.trace("Could not rollback transaction {}, was not found in cache.", row.getTransactionId()); + } } /** @@ -965,6 +970,38 @@ private class ParsedLobWriteSql { */ protected abstract Scn getTransactionCacheMinimumScn(); + /** + * Returns whether the transaction id has no sequence number component. + * + * Oracle transaction identifiers are a composite of: + *
    + *
  1. Undo segment number
  2. + *
  3. Slot numbber of the transaction that generated the change
  4. + *
  5. Sequence number of the transaction that generated the change
  6. + *
+ * + * When Oracle LogMiner mines records, it is possible that when an undo operation is detected, + * often the product of a constraint violation, the LogMiner row will have the same explicit + * XID (transaction id) as the source operation that we should undo; however, if the record + * to be undone was mined in a prior iteration, Oracle LogMiner won't be able to make a link + * back to the full transaction's sequence number, therefore the XID value for the undo row + * will contain only the undo segment number and slot number, setting the sequence number to + * 4294967295 (aka -1 or 0xFFFFFFFF). + * + * This method explicitly checks if the provided transaction id has the no sequence sentinel + * value and if so, returns {@code true}; otherwise returns {@code false}. + * + * @param transactionId the transaction identifier to check, should not be {@code null} + * @return true if the transaction has no sequence reference, false if it does + */ + protected boolean isTransactionIdWithNoSequence(String transactionId) { + return transactionId.endsWith(NO_SEQUENCE_TRX_ID_SUFFIX); + } + + protected String getTransactionIdPrefix(String transactionId) { + return transactionId.substring(0, 8); + } + /** * Wrapper for all counter variables * diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java index 157aca00a..3dce8387f 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/infinispan/AbstractInfinispanLogMinerEventProcessor.java @@ -90,18 +90,45 @@ protected InfinispanTransaction createTransaction(LogMinerEventRow row) { @Override protected void removeEventWithRowId(LogMinerEventRow row) { - List eventKeys = getEventCache().keySet() - .stream() - .filter(k -> k.startsWith(row.getTransactionId() + "-")) - .collect(Collectors.toList()); - - for (String eventKey : eventKeys) { - final LogMinerEvent event = getEventCache().get(eventKey); - if (event != null && event.getRowId().equals(row.getRowId())) { - LOGGER.trace("Undo applied for event {}.", event); - getEventCache().remove(eventKey); + List eventKeys = getTransactionKeysWithPrefix(row.getTransactionId() + "-"); + if (eventKeys.isEmpty() && isTransactionIdWithNoSequence(row.getTransactionId())) { + // This means that Oracle LogMiner found an event that should be undone but its corresponding + // undo entry was read in a prior mining session and the transaction's sequence could not be + // resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT. + final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); + LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); + LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); + eventKeys = getTransactionKeysWithPrefix(transactionPrefix); + if (!eventKeys.isEmpty()) { + for (String eventKey : eventKeys) { + final LogMinerEvent event = getEventCache().get(eventKey); + if (event != null && event.getRowId().equals(row.getRowId())) { + LOGGER.debug("Undo change '{}' applied to transaction '{}'", row, eventKey); + getEventCache().remove(eventKey); + return; + } + } + LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId()); + } + else { + LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); } } + else { + for (String eventKey : eventKeys) { + final LogMinerEvent event = getEventCache().get(eventKey); + if (event != null && event.getRowId().equals(row.getRowId())) { + LOGGER.trace("Undo applied for event {}.", event); + getEventCache().remove(eventKey); + return; + } + } + LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId()); + } + } + + private List getTransactionKeysWithPrefix(String prefix) { + return getEventCache().keySet().stream().filter(k -> k.startsWith(prefix)).collect(Collectors.toList()); } @Override diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java index bfaac2a1e..dac442e41 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryLogMinerEventProcessor.java @@ -92,12 +92,36 @@ protected MemoryTransaction createTransaction(LogMinerEventRow row) { @Override protected void removeEventWithRowId(LogMinerEventRow row) { - final MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId()); + MemoryTransaction transaction = getTransactionCache().get(row.getTransactionId()); if (transaction == null) { - LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); + if (isTransactionIdWithNoSequence(row.getTransactionId())) { + // This means that Oracle LogMiner found an event that should be undone but its corresponding + // undo entry was read in a prior mining session and the transaction's sequence could not be + // resolved. In this case, lets locate the transaction based solely on XIDUSN and XIDSLT. + final String transactionPrefix = getTransactionIdPrefix(row.getTransactionId()); + LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", row.getTransactionId()); + LOGGER.debug("Checking all transactions with prefix '{}'", transactionPrefix); + for (String transactionKey : getTransactionCache().keySet()) { + if (transactionKey.startsWith(transactionPrefix)) { + transaction = getTransactionCache().get(transactionKey); + if (transaction != null && transaction.removeEventWithRowId(row.getRowId())) { + // We successfully found a transaction with the same XISUSN and XIDSLT and that + // transaction included a change for the specified row id. + LOGGER.debug("Undo change '{}' applied to transaction '{}'", row, transactionKey); + return; + } + } + } + LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId()); + } + else { + LOGGER.warn("Cannot undo change '{}' since transaction was not found.", row); + } } else { - transaction.removeEventWithRowId(row.getRowId()); + if (!transaction.removeEventWithRowId(row.getRowId())) { + LOGGER.warn("Cannot undo change '{}' since event with row-id {} was not found.", row, row.getRowId()); + } } } diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java index 7d0b11cf9..49fc73397 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/memory/MemoryTransaction.java @@ -53,8 +53,8 @@ public List getEvents() { return events; } - public void removeEventWithRowId(String rowId) { - events.removeIf(event -> { + public boolean removeEventWithRowId(String rowId) { + return events.removeIf(event -> { if (event.getRowId().equals(rowId)) { LOGGER.trace("Undo applied for event {}.", event); return true; diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java index 9ab7dc7ac..894564e69 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/OracleConnectorIT.java @@ -29,6 +29,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -2199,6 +2204,81 @@ public void shouldNotEmitEventsOnConstraintViolations() throws Exception { } } + @Test + @FixFor("DBZ-5090") + public void shouldNotEmitEventsOnConstraintViolationsAcrossSessions() throws Exception { + TestHelper.dropTable(connection, "dbz5090"); + try { + connection.execute("CREATE TABLE dbz5090 (id number(9,0), data varchar2(50))"); + connection.execute("CREATE UNIQUE INDEX uk_dbz5090 ON dbz5090 (id)"); + TestHelper.streamTable(connection, "dbz5090"); + + Configuration config = TestHelper.defaultConfig() + .with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5090") + .build(); + + start(OracleConnector.class, config); + assertConnectorIsRunning(); + + waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME); + + // We require the use of an executor here so that the multiple threads cooperate with one + // another in a way that does not block the test moving forward in the various stages. + ExecutorService executorService = Executors.newFixedThreadPool(1); + + try (OracleConnection connection2 = TestHelper.testConnection()) { + + connection.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (1,'Test1')"); + + // Task that creates in-progress transaction with second connection + final CountDownLatch latch = new CountDownLatch(1); + final Callable task = () -> { + try { + connection2.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (2,'Test2')"); + + latch.countDown(); + connection2.executeWithoutCommitting("INSERT INTO dbz5090 (id,data) values (1,'Test2')"); + return true; + } + catch (SQLException e) { + return false; + } + }; + + // Submit the blocking task on the executor service + Future future = executorService.submit(task); + + // We wait until the latch has been triggered by the callable task + latch.await(); + + // Explicitly wait 5 seconds to guarantee that the thread has executed the SQL + Thread.sleep(5000); + + connection.commit(); + + // Get the secondary thread blocking state, should return false due to constraint violation + assertThat(future.get()).isFalse(); + + connection2.commit(); + } + + final SourceRecords sourceRecords = consumeRecordsByTopic(2); + List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5090"); + assertThat(records).hasSize(2); + + VerifyRecord.isValidInsert(records.get(0), "ID", 1); + + final Struct after = (((Struct) records.get(0).value()).getStruct("after")); + assertThat(after.get("ID")).isEqualTo(1); + assertThat(after.get("DATA")).isEqualTo("Test1"); + + assertNoRecordsToConsume(); + } + finally { + TestHelper.dropTable(connection, "dbz5090"); + } + } + @Test @FixFor("DBZ-3322") public void shouldNotEmitEventsInRollbackTransaction() throws Exception {