DBZ-3676 When LOB support is disabled, use adaptive SCN mining windows
This commit is contained in:
parent
09fa730d98
commit
01e87c11ed
@ -185,7 +185,16 @@ public void execute(ChangeEventSourceContext context, OracleOffsetContext offset
|
||||
Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
|
||||
streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
|
||||
processor.processResult(rs);
|
||||
startScn = transactionalBuffer.updateOffsetContext(offsetContext, dispatcher);
|
||||
if (connectorConfig.isLobEnabled()) {
|
||||
startScn = transactionalBuffer.updateOffsetContext(offsetContext, dispatcher);
|
||||
}
|
||||
else {
|
||||
if (transactionalBuffer.isEmpty()) {
|
||||
LOGGER.debug("Buffer is empty, updating offset SCN to {}", endScn);
|
||||
offsetContext.setScn(endScn);
|
||||
}
|
||||
startScn = endScn;
|
||||
}
|
||||
}
|
||||
|
||||
streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start, Instant.now()));
|
||||
|
@ -319,8 +319,10 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext,
|
||||
maxCommittedScn = lastCommittedScn;
|
||||
}
|
||||
|
||||
// cache recent transaction and commit scn for handling offset updates
|
||||
recentlyCommittedTransactionIds.add(new RecentlyCommittedTransaction(transaction, scn));
|
||||
if (connectorConfig.isLobEnabled()) {
|
||||
// cache recent transaction and commit scn for handling offset updates
|
||||
recentlyCommittedTransactionIds.add(new RecentlyCommittedTransaction(transaction, scn));
|
||||
}
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
LogMinerHelper.logError(streamingMetrics, "Commit interrupted", e);
|
||||
|
Loading…
Reference in New Issue
Block a user