DBZ-3090 Cleanup LogMiner TransactionBuffer commit handler

This commit is contained in:
Chris Cranford 2021-02-17 20:53:45 -05:00 committed by Gunnar Morling
parent 4dbd9f3e4c
commit 304f38e15a
3 changed files with 5 additions and 88 deletions

View File

@ -186,12 +186,11 @@ public void execute(ChangeEventSourceContext context) {
logMinerMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
updateStartScn(transactionalBuffer);
startScn = endScn;
if (transactionalBuffer.isEmpty()) {
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);
transactionalBuffer.resetLargestScn(null);
}
}
}
@ -237,22 +236,10 @@ private void abandonOldTransactionsIfExist(Connection connection, TransactionalB
lastScnToAbandonTransactions.ifPresent(thresholdScn -> {
transactionalBuffer.abandonLongTransactions(thresholdScn, offsetContext);
offsetContext.setScn(thresholdScn);
updateStartScn(transactionalBuffer);
startScn = endScn;
});
}
// TODO computing the largest scn in the buffer is a left-over from previous incarnations, remove it.
// TODO We don't need to keep largestScn in the buffer at all. clean it
private void updateStartScn(TransactionalBuffer transactionalBuffer) {
long nextStartScn = transactionalBuffer.getLargestScn().equals(Scn.ZERO) ? endScn : transactionalBuffer.getLargestScn().longValue();
if (nextStartScn <= startScn) {
LOGGER.trace("Resetting largest SCN in transaction buffer to {}, nextStartScn={}, startScn={}", endScn, nextStartScn, startScn);
// When system is idle, largest SCN may stay unchanged, move it forward then
transactionalBuffer.resetLargestScn(endScn);
}
startScn = endScn;
}
private void initializeRedoLogsForMining(Connection connection, boolean postEndMiningSession, Duration archiveLogRetention) throws SQLException {
if (!postEndMiningSession) {
if (OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)) {

View File

@ -45,9 +45,6 @@ public final class TransactionalBuffer implements AutoCloseable {
private final Set<String> rolledBackTransactionIds;
private final TransactionalBufferMetrics metrics;
// It holds the latest captured SCN.
// This number tracks starting point for the next mining cycle.
private Scn largestScn;
private Scn lastCommittedScn;
/**
@ -59,7 +56,6 @@ public final class TransactionalBuffer implements AutoCloseable {
TransactionalBuffer(OracleTaskContext taskContext, ErrorHandler errorHandler) {
this.transactions = new HashMap<>();
this.errorHandler = errorHandler;
this.largestScn = Scn.ZERO;
this.lastCommittedScn = Scn.ZERO;
this.abandonedTransactionIds = new HashSet<>();
this.rolledBackTransactionIds = new HashSet<>();
@ -76,13 +72,6 @@ TransactionalBufferMetrics getMetrics() {
return metrics;
}
/**
* @return largest last SCN in the buffer among all transactions
*/
Scn getLargestScn() {
return largestScn;
}
/**
* @return rolled back transactions
*/
@ -90,18 +79,6 @@ Set<String> getRolledBackTransactionIds() {
return new HashSet<>(rolledBackTransactionIds);
}
/**
* Reset Largest SCN
*/
void resetLargestScn(Long value) {
if (value != null) {
largestScn = Scn.fromLong(value);
}
else {
largestScn = Scn.ZERO;
}
}
/**
* Registers callback to execute when transaction commits.
*
@ -131,10 +108,6 @@ void registerCommitCallback(String transactionId, Scn scn, Instant changeTime, C
if (transaction != null) {
transaction.commitCallbacks.add(callback);
}
if (scn.compareTo(largestScn) > 0) {
largestScn = scn;
}
}
/**
@ -153,15 +126,12 @@ void registerCommitCallback(String transactionId, Scn scn, Instant changeTime, C
boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, Timestamp timestamp,
ChangeEventSource.ChangeEventSourceContext context, String debugMessage, EventDispatcher dispatcher) {
Transaction transaction = transactions.get(transactionId);
Instant start = Instant.now();
Transaction transaction = transactions.remove(transactionId);
if (transaction == null) {
return false;
}
Instant start = Instant.now();
calculateLargestScn();
transaction = transactions.remove(transactionId);
Scn smallestScn = calculateSmallestScn();
abandonedTransactionIds.remove(transactionId);
@ -177,7 +147,7 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
}
List<CommitCallback> commitCallbacks = transaction.commitCallbacks;
LOGGER.trace("COMMIT, {}, smallest SCN: {}, largest SCN {}", debugMessage, smallestScn, largestScn);
LOGGER.trace("COMMIT, {}, smallest SCN: {}", debugMessage, smallestScn);
commit(context, offsetContext, start, commitCallbacks, timestamp, smallestScn, scn, dispatcher);
return true;
@ -230,9 +200,7 @@ boolean rollback(String transactionId, String debugMessage) {
if (transaction != null) {
LOGGER.debug("Transaction rolled back: {}", debugMessage);
calculateLargestScn(); // in case if largest SCN was in this transaction
transactions.remove(transactionId);
abandonedTransactionIds.remove(transactionId);
rolledBackTransactionIds.add(transactionId);
@ -275,7 +243,6 @@ void abandonLongTransactions(Long thresholdScn, OracleOffsetContext offsetContex
transaction.getValue().toString());
abandonedTransactionIds.add(transaction.getKey());
iter.remove();
calculateLargestScn();
metrics.addAbandonedTransactionId(transaction.getKey());
metrics.setActiveTransactions(transactions.size());
@ -298,15 +265,6 @@ private Scn calculateSmallestScn() {
return scn;
}
private void calculateLargestScn() {
largestScn = transactions.isEmpty() ? Scn.ZERO
: transactions.values()
.stream()
.map(transaction -> transaction.lastScn)
.max(Scn::compareTo)
.orElseThrow(() -> new DataException("Cannot calculate largest SCN"));
}
/**
* Returns {@code true} if buffer is empty, otherwise {@code false}.
*

View File

@ -132,7 +132,6 @@ public void testIsEmptyWhenTransactionIsRolledBack() {
});
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN);
}
@Test
@ -143,7 +142,6 @@ public void testNonEmptyFirstTransactionIsRolledBack() {
});
transactionalBuffer.rollback(TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isTrue();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isFalse();
}
@ -156,7 +154,6 @@ public void testNonEmptySecondTransactionIsRolledBack() {
});
transactionalBuffer.rollback(OTHER_TRANSACTION_ID, "");
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(TRANSACTION_ID)).isFalse();
assertThat(transactionalBuffer.getRolledBackTransactionIds().contains(OTHER_TRANSACTION_ID)).isTrue();
}
@ -169,11 +166,9 @@ public void testCalculateScnWhenTransactionIsCommitted() throws InterruptedExcep
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // before commit
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
commitLatch.await();
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(SCN); // after commit
assertThat(smallestScnContainer.get()).isNull();
@ -190,12 +185,10 @@ public void testCalculateScnWhenFirstTransactionIsCommitted() throws Interrupted
});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), null, false, true, new TransactionContext());
transactionalBuffer.commit(TRANSACTION_ID, SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
commitLatch.await();
// after commit, it stays the same because OTHER_TRANSACTION_ID is not committed yet
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(smallestScnContainer.get()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
@ -211,33 +204,14 @@ public void testCalculateScnWhenSecondTransactionIsCommitted() throws Interrupte
smallestScnContainer.set(smallestScn);
commitLatch.countDown();
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true, new TransactionContext());
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN.add(Scn.ONE), offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
commitLatch.await();
assertThat(smallestScnContainer.get()).isEqualTo(SCN);
// after committing OTHER_TRANSACTION_ID
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
assertThat(transactionalBuffer.getRolledBackTransactionIds().isEmpty()).isTrue();
}
@Test
public void testResetLargestScn() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
});
transactionalBuffer.registerCommitCallback(OTHER_TRANSACTION_ID, OTHER_SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
});
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // before commit
offsetContext = new OracleOffsetContext(connectorConfig, OTHER_SCN.longValue(), OTHER_SCN.longValue(), null, false, true, new TransactionContext());
transactionalBuffer.commit(OTHER_TRANSACTION_ID, OTHER_SCN, offsetContext, TIMESTAMP, () -> true, MESSAGE, dispatcher);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN); // after commit
transactionalBuffer.resetLargestScn(null);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO);
transactionalBuffer.resetLargestScn(OTHER_SCN.longValue());
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}
@Test
public void testAbandoningOneTransaction() {
transactionalBuffer.registerCommitCallback(TRANSACTION_ID, SCN, Instant.now(), (timestamp, smallestScn, commitScn, counter) -> {
@ -245,7 +219,6 @@ public void testAbandoningOneTransaction() {
offsetContext = new OracleOffsetContext(connectorConfig, SCN.longValue(), SCN.longValue(), (LcrPosition) null, false, true, new TransactionContext());
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(true);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(Scn.ZERO);
}
@Test
@ -256,7 +229,6 @@ public void testAbandoningTransactionHavingAnotherOne() {
});
transactionalBuffer.abandonLongTransactions(SCN.longValue(), offsetContext);
assertThat(transactionalBuffer.isEmpty()).isEqualTo(false);
assertThat(transactionalBuffer.getLargestScn()).isEqualTo(OTHER_SCN);
}
@Test