From edac20f658b19ba4a2f8c7bb77f522243e026142 Mon Sep 17 00:00:00 2001 From: jchipmunk Date: Sun, 11 Feb 2024 19:34:21 +0300 Subject: [PATCH] DBZ-7473 Defer transaction capture until the first DML event occurs Do not handle start event from V$LOGMNR_CONTENTS view if CLOB/BLOB support is disabled and memory buffer is used. --- .../oracle/logminer/LogMinerQueryBuilder.java | 8 +- .../AbstractLogMinerEventProcessor.java | 105 +++++----- .../logminer/LogMinerQueryBuilderTest.java | 27 ++- .../processor/AbstractProcessorUnitTest.java | 180 ++++++++++++++++++ 4 files changed, 267 insertions(+), 53 deletions(-) diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java index 89690e3ba..2196c28df 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java @@ -151,7 +151,13 @@ private static String getOperationCodePredicate(OracleConnectorConfig connectorC operationInClause.withValues(OPERATION_CODES_LOB); } else { - operationInClause.withValues(OPERATION_CODES_NO_LOB); + final List operationCodes = new ArrayList<>(OPERATION_CODES_NO_LOB); + // The transaction start event needs to be handled when a persistent buffer (Infinispan) is used + // because it is needed to reset the event id counter when re-mining transaction events. + if (connectorConfig.getLogMiningBufferType() == OracleConnectorConfig.LogMiningBufferType.MEMORY) { + operationCodes.removeIf(operationCode -> operationCode == 6); + } + operationInClause.withValues(operationCodes); } predicate.append("(").append(operationInClause.build()); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java index 37a88fd6b..df025dd6c 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/processor/AbstractLogMinerEventProcessor.java @@ -475,50 +475,45 @@ protected void handleCommit(OraclePartition partition, LogMinerEventRow row) thr final T transaction = getAndRemoveTransactionFromCache(transactionId); if (transaction == null) { + LOGGER.debug("Transaction {} not found in cache, no events to commit.", transactionId); handleCommitNotFoundInBuffer(row); - LOGGER.debug("Transaction {} not found, commit skipped.", transactionId); - return; - } - - // Calculate the smallest SCN that remains in the transaction cache - final Optional oldestTransaction = getOldestTransactionInCache(); - final Scn smallestScn; - if (oldestTransaction.isPresent()) { - smallestScn = oldestTransaction.get().getStartScn(); - metrics.setOldestScnDetails(smallestScn, oldestTransaction.get().getChangeTime()); - } - else { - smallestScn = Scn.NULL; - metrics.setOldestScnDetails(Scn.valueOf(-1), null); } + final Scn smallestScn = calculateSmallestScn(); final Scn commitScn = row.getScn(); if (offsetContext.getCommitScn().hasCommitAlreadyBeenHandled(row)) { - if (transaction.getNumberOfEvents() > 0) { - final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread()); - LOGGER.debug("Transaction {} has already been processed. " - + "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", - transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn); + if (transaction != null) { + if (transaction.getNumberOfEvents() > 0) { + final Scn lastCommittedScn = offsetContext.getCommitScn().getCommitScnForRedoThread(row.getThread()); + LOGGER.debug("Transaction {} has already been processed. " + + "Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", + transactionId, offsetContext.getCommitScn(), commitScn, lastCommittedScn); + } + cleanupAfterTransactionRemovedFromCache(transaction, false); + metrics.setActiveTransactionCount(getTransactionCache().size()); } - cleanupAfterTransactionRemovedFromCache(transaction, false); - metrics.setActiveTransactionCount(getTransactionCache().size()); return; } counters.commitCount++; - int numEvents = getTransactionEventCount(transaction); + int numEvents = (transaction == null) ? 0 : getTransactionEventCount(transaction); LOGGER.debug("Committing transaction {} with {} events (scn: {}, oldest buffer scn: {}): {}", transactionId, numEvents, row.getScn(), smallestScn, row); - final ZoneOffset databaseOffset = metrics.getDatabaseOffset(); + // When a COMMIT is received, regardless of the number of events it has, it still + // must be recorded in the commit scn for the node to guarantee updates to the + // offsets. This must be done prior to dispatching the transaction-commit or the + // heartbeat event that follows commit dispatch. + offsetContext.getCommitScn().recordCommit(row); - final boolean skipExcludedUserName = isTransactionUserExcluded(transaction); - TransactionCommitConsumer.Handler delegate = new TransactionCommitConsumer.Handler<>() { - private int numEvents = getTransactionEventCount(transaction); - - @Override - public void accept(LogMinerEvent event, long eventsProcessed) throws InterruptedException { + Instant start = Instant.now(); + boolean dispatchTransactionCommittedEvent = false; + if (numEvents > 0) { + final boolean skipExcludedUserName = isTransactionUserExcluded(transaction); + dispatchTransactionCommittedEvent = !skipExcludedUserName; + final ZoneOffset databaseOffset = metrics.getDatabaseOffset(); + TransactionCommitConsumer.Handler delegate = (event, eventsProcessed) -> { // Update SCN in offset context only if processed SCN less than SCN of other transactions if (smallestScn.isNull() || commitScn.compareTo(smallestScn) < 0) { offsetContext.setScn(event.getScn()); @@ -571,20 +566,9 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted // Clear redo SQL offsetContext.setRedoSql(null); - - } - }; - - // When a COMMIT is received, regardless of the number of events it has, it still - // must be recorded in the commit scn for the node to guarantee updates to the - // offsets. This must be done prior to dispatching the transaction-commit or the - // heartbeat event that follows commit dispatch. - offsetContext.getCommitScn().recordCommit(row); - - Instant start = Instant.now(); - int dispatchedEventCount = 0; - if (numEvents > 0) { + }; try (TransactionCommitConsumer commitConsumer = new TransactionCommitConsumer(delegate, connectorConfig, schema)) { + int dispatchedEventCount = 0; final Iterator iterator = getTransactionEventIterator(transaction); while (iterator.hasNext()) { if (!context.isRunning()) { @@ -601,7 +585,7 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted offsetContext.setEventScn(commitScn); offsetContext.setRsId(row.getRsId()); - if (getTransactionEventCount(transaction) > 0 && !skipExcludedUserName) { + if (dispatchTransactionCommittedEvent) { dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, transaction.getChangeTime()); } else { @@ -610,16 +594,37 @@ public void accept(LogMinerEvent event, long eventsProcessed) throws Interrupted metrics.calculateLagFromSource(row.getChangeTime()); - finalizeTransactionCommit(transactionId, commitScn); - cleanupAfterTransactionRemovedFromCache(transaction, false); + if (transaction != null) { + finalizeTransactionCommit(transactionId, commitScn); + cleanupAfterTransactionRemovedFromCache(transaction, false); + metrics.setActiveTransactionCount(getTransactionCache().size()); + } metrics.incrementCommittedTransactionCount(); - metrics.setActiveTransactionCount(getTransactionCache().size()); metrics.setCommitScn(commitScn); metrics.setOffsetScn(offsetContext.getScn()); metrics.setLastCommitDuration(Duration.between(start, Instant.now())); } + /** + * Calculate the smallest SCN that remains in the transaction cache. + * + * @return the smallest SCN + */ + private Scn calculateSmallestScn() { + final Optional oldestTransaction = getOldestTransactionInCache(); + final Scn smallestScn; + if (oldestTransaction.isPresent()) { + smallestScn = oldestTransaction.get().getStartScn(); + metrics.setOldestScnDetails(smallestScn, oldestTransaction.get().getChangeTime()); + } + else { + smallestScn = Scn.NULL; + metrics.setOldestScnDetails(Scn.valueOf(-1), null); + } + return smallestScn; + } + /** * Allow for post-processing of a transaction commit in the stream that was not found in the * transaction buffer, perhaps because it aged out due to retention policies. @@ -722,14 +727,14 @@ protected void handleRollback(LogMinerEventRow row) { LOGGER.debug("Transaction {} was rolled back.", row.getTransactionId()); finalizeTransactionRollback(row.getTransactionId(), row.getScn()); metrics.setActiveTransactionCount(getTransactionCache().size()); - metrics.incrementRolledBackTransactionCount(); - metrics.addRolledBackTransactionId(row.getTransactionId()); - counters.rollbackCount++; } else { - LOGGER.debug("Could not rollback transaction {}, was not found in cache.", row.getTransactionId()); + LOGGER.debug("Transaction {} not found in cache, no events to rollback.", row.getTransactionId()); handleRollbackNotFoundInBuffer(row); } + metrics.incrementRolledBackTransactionCount(); + metrics.addRolledBackTransactionId(row.getTransactionId()); + counters.rollbackCount++; } /** diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java index a7cb20b07..a9d5f5270 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilderTest.java @@ -7,6 +7,7 @@ import static io.debezium.config.CommonConnectorConfig.SIGNAL_DATA_COLLECTION; import static io.debezium.connector.oracle.OracleConnectorConfig.LOB_ENABLED; +import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_BUFFER_TYPE; import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE; import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST; import static io.debezium.connector.oracle.OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST; @@ -61,7 +62,8 @@ public class LogMinerQueryBuilderTest { private static final String PDB_PREDICATE = "SRC_CON_NAME = '${pdbName}'"; private static final String OPERATION_CODES_LOB_ENABLED = "1,2,3,6,7,9,10,11,29,34,36,68,70,71,255"; - private static final String OPERATION_CODES_LOB_DISABLED = "1,2,3,6,7,34,36,255"; + private static final String OPERATION_CODES_LOB_DISABLED = "1,2,3,7,34,36,255"; + private static final String OPERATION_CODES_LOB_DISABLED_AND_PERSISTENT_BUFFER = "1,2,3,6,7,34,36,255"; private static final String OPERATION_CODES_PREDICATE = "(OPERATION_CODE IN (${operationCodes})${operationDdl})"; @@ -96,6 +98,24 @@ public void testLogMinerQueryWithLobDisabled() { assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig)); } + @Test + @FixFor("DBZ-7473") + public void testLogMinerQueryWithLobDisabledAndPersistentBuffer() { + Configuration config = TestHelper.defaultConfig() + .with(LOG_MINING_BUFFER_TYPE, OracleConnectorConfig.LogMiningBufferType.INFINISPAN_EMBEDDED) + .build(); + OracleConnectorConfig connectorConfig = new OracleConnectorConfig(config); + + String result = LogMinerQueryBuilder.build(connectorConfig); + assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig)); + + config = TestHelper.defaultConfig().with(PDB_NAME, "").build(); + connectorConfig = new OracleConnectorConfig(config); + + result = LogMinerQueryBuilder.build(connectorConfig); + assertThat(result).isEqualTo(getQueryFromTemplate(connectorConfig)); + } + @Test @FixFor("DBZ-5648") public void testLogMinerQueryWithLobEnabled() { @@ -191,7 +211,10 @@ private String getPdbPredicate(OracleConnectorConfig config) { } private String getOperationCodePredicate(OracleConnectorConfig config) { - final String codes = config.isLobEnabled() ? OPERATION_CODES_LOB_ENABLED : OPERATION_CODES_LOB_DISABLED; + final String codes = config.isLobEnabled() ? OPERATION_CODES_LOB_ENABLED + : (config.getLogMiningBufferType() == OracleConnectorConfig.LogMiningBufferType.MEMORY) + ? OPERATION_CODES_LOB_DISABLED + : OPERATION_CODES_LOB_DISABLED_AND_PERSISTENT_BUFFER; final String predicate = OPERATION_CODES_PREDICATE.replace("${operationCodes}", codes); return predicate.replace("${operationDdl}", config.storeOnlyCapturedTables() ? getOperationDdlPredicate() : ""); } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java index 7ce85e130..119c6040c 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/logminer/processor/AbstractProcessorUnitTest.java @@ -136,6 +136,16 @@ public void testCacheIsNotEmptyWhenTransactionIsAdded() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testCacheIsNotEmptyWhenTransactionIsAddedAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + assertThat(processor.getTransactionCache().isEmpty()).isFalse(); + } + } + @Test public void testCacheIsEmptyWhenTransactionIsCommitted() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -149,6 +159,18 @@ public void testCacheIsEmptyWhenTransactionIsCommitted() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testCacheIsEmptyWhenTransactionIsCommittedAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + final OraclePartition partition = new OraclePartition(config.getLogicalName(), config.getDatabaseName()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleCommit(partition, getCommitLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1)); + assertThat(processor.getTransactionCache().isEmpty()).isTrue(); + } + } + @Test public void testCacheIsEmptyWhenTransactionIsRolledBack() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -160,6 +182,17 @@ public void testCacheIsEmptyWhenTransactionIsRolledBack() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testCacheIsEmptyWhenTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1)); + assertThat(processor.getTransactionCache().isEmpty()).isTrue(); + } + } + @Test public void testCacheIsNotEmptyWhenFirstTransactionIsRolledBack() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -175,6 +208,20 @@ public void testCacheIsNotEmptyWhenFirstTransactionIsRolledBack() throws Excepti } } + @Test + @FixFor("DBZ-7473") + public void testCacheIsNotEmptyWhenFirstTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2)); + processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1)); + assertThat(processor.getTransactionCache().isEmpty()).isFalse(); + assertThat(metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isTrue(); + assertThat(metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isFalse(); + } + } + @Test public void testCacheIsNotEmptyWhenSecondTransactionIsRolledBack() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -190,6 +237,20 @@ public void testCacheIsNotEmptyWhenSecondTransactionIsRolledBack() throws Except } } + @Test + @FixFor("DBZ-7473") + public void testCacheIsNotEmptyWhenSecondTransactionIsRolledBackAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2)); + processor.handleRollback(getRollbackLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2)); + assertThat(processor.getTransactionCache().isEmpty()).isFalse(); + assertThat(metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_2)).isTrue(); + assertThat(metrics.getRolledBackTransactionIds().contains(TRANSACTION_ID_1)).isFalse(); + } + } + @Test public void testCalculateScnWhenTransactionIsCommitted() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -203,6 +264,19 @@ public void testCalculateScnWhenTransactionIsCommitted() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testCalculateScnWhenTransactionIsCommittedAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + final OraclePartition partition = new OraclePartition(config.getLogicalName(), config.getDatabaseName()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleCommit(partition, getCommitLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1)); + assertThat(metrics.getOldestScn()).isEqualTo(Scn.valueOf(1L).toString()); + assertThat(metrics.getRolledBackTransactionIds()).isEmpty(); + } + } + @Test public void testCalculateScnWhenFirstTransactionIsCommitted() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -222,6 +296,24 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testCalculateScnWhenFirstTransactionIsCommittedAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + final OraclePartition partition = new OraclePartition(config.getLogicalName(), config.getDatabaseName()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2)); + + processor.handleCommit(partition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_1)); + assertThat(metrics.getOldestScn()).isEqualTo(Scn.valueOf(2L).toString()); + assertThat(metrics.getRolledBackTransactionIds()).isEmpty(); + + processor.handleCommit(partition, getCommitLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_2)); + assertThat(metrics.getOldestScn()).isEqualTo(Scn.valueOf(2L).toString()); + } + } + @Test public void testCalculateScnWhenSecondTransactionIsCommitted() throws Exception { final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); @@ -238,6 +330,21 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Exception } } + @Test + @FixFor("DBZ-7473") + public void testCalculateScnWhenSecondTransactionIsCommittedAndStartEventIsNotHandled() throws Exception { + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + final OraclePartition partition = new OraclePartition(config.getLogicalName(), config.getDatabaseName()); + try (T processor = getProcessor(config)) { + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(1L), TRANSACTION_ID_1)); + processor.handleDataEvent(getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_2)); + + processor.handleCommit(partition, getCommitLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2)); + assertThat(metrics.getOldestScn()).isEqualTo(Scn.valueOf(1L).toString()); + assertThat(metrics.getRolledBackTransactionIds()).isEmpty(); + } + } + @Test @FixFor("DBZ-6679") public void testEmptyResultSetWithMineRangeAdvancesCorrectly() throws Exception { @@ -322,6 +429,25 @@ public void testAbandonOneTransaction() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testAbandonOneTransactionAndStartEventIsNotHandled() throws Exception { + if (!isTransactionAbandonmentSupported()) { + return; + } + + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + Mockito.when(offsetContext.getScn()).thenReturn(Scn.valueOf(1L)); + Mockito.when(offsetContext.getSnapshotScn()).thenReturn(Scn.NULL); + + Instant changeTime = Instant.now().minus(24, ChronoUnit.HOURS); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, changeTime)); + processor.abandonTransactions(Duration.ofHours(1L)); + assertThat(processor.getTransactionCache().isEmpty()).isTrue(); + } + } + @Test public void testAbandonTransactionHavingAnotherOne() throws Exception { if (!isTransactionAbandonmentSupported()) { @@ -345,6 +471,28 @@ public void testAbandonTransactionHavingAnotherOne() throws Exception { } } + @Test + @FixFor("DBZ-7473") + public void testAbandonTransactionHavingAnotherOneAndStartEventIsNotHandled() throws Exception { + if (!isTransactionAbandonmentSupported()) { + return; + } + + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + Mockito.when(offsetContext.getScn()).thenReturn(Scn.valueOf(1L)); + Mockito.when(offsetContext.getSnapshotScn()).thenReturn(Scn.NULL); + + Instant changeTime = Instant.now().minus(24, ChronoUnit.HOURS); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, changeTime)); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2)); + processor.abandonTransactions(Duration.ofHours(1L)); + assertThat(processor.getTransactionCache().isEmpty()).isFalse(); + assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull(); + assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNotNull(); + } + } + @Test @FixFor("DBZ-6355") public void testAbandonTransactionsUsingFallbackBasedOnChangeTime() throws Exception { @@ -378,6 +526,38 @@ public void testAbandonTransactionsUsingFallbackBasedOnChangeTime() throws Excep assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNull(); assertThat(processor.getTransactionCache().get(TRANSACTION_ID_3)).isNotNull(); } + } + + @Test + @FixFor("DBZ-7473") + public void testAbandonTransactionsUsingFallbackBasedOnChangeTimeAndStartEventIsNotHandled() throws Exception { + if (!isTransactionAbandonmentSupported()) { + return; + } + + // re-create some mocked objects + this.schema.close(); + + connection = createOracleConnection(true); + schema = createOracleDatabaseSchema(); + metrics = createMetrics(schema); + + final OracleConnectorConfig config = new OracleConnectorConfig(getConfig().build()); + try (T processor = getProcessor(config)) { + Mockito.when(offsetContext.getScn()).thenReturn(Scn.valueOf(1L)); + Mockito.when(offsetContext.getSnapshotScn()).thenReturn(Scn.NULL); + + Instant changeTime1 = Instant.now().minus(24, ChronoUnit.HOURS); + Instant changeTime2 = Instant.now().minus(23, ChronoUnit.HOURS); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(2L), TRANSACTION_ID_1, changeTime1)); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(3L), TRANSACTION_ID_2, changeTime2)); + processor.processRow(partition, getInsertLogMinerEventRow(Scn.valueOf(4L), TRANSACTION_ID_3)); + processor.abandonTransactions(Duration.ofHours(1L)); + assertThat(processor.getTransactionCache().isEmpty()).isFalse(); + assertThat(processor.getTransactionCache().get(TRANSACTION_ID_1)).isNull(); + assertThat(processor.getTransactionCache().get(TRANSACTION_ID_2)).isNull(); + assertThat(processor.getTransactionCache().get(TRANSACTION_ID_3)).isNotNull(); + } }