diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index 2217244f5..467f29f84 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -185,7 +185,16 @@ public void execute(ChangeEventSourceContext context, OracleOffsetContext offset Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal(); streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing); processor.processResult(rs); - startScn = transactionalBuffer.updateOffsetContext(offsetContext, dispatcher); + if (connectorConfig.isLobEnabled()) { + startScn = transactionalBuffer.updateOffsetContext(offsetContext, dispatcher); + } + else { + if (transactionalBuffer.isEmpty()) { + LOGGER.debug("Buffer is empty, updating offset SCN to {}", endScn); + offsetContext.setScn(endScn); + } + startScn = endScn; + } } streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start, Instant.now())); diff --git a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java index 7177827a6..4e97a4137 100644 --- a/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java +++ b/debezium-connector-oracle/src/main/java/io/debezium/connector/oracle/logminer/TransactionalBuffer.java @@ -319,8 +319,10 @@ boolean commit(String transactionId, Scn scn, OracleOffsetContext offsetContext, maxCommittedScn = lastCommittedScn; } - // cache recent transaction and commit scn for handling offset updates - recentlyCommittedTransactionIds.add(new RecentlyCommittedTransaction(transaction, scn)); + if (connectorConfig.isLobEnabled()) { + // cache recent transaction and commit scn for handling offset updates + recentlyCommittedTransactionIds.add(new RecentlyCommittedTransaction(transaction, scn)); + } } catch (InterruptedException e) { LogMinerHelper.logError(streamingMetrics, "Commit interrupted", e);