DBZ-6026 Don't update last offsets in statistics

This commit is contained in:
Jiri Pechanec 2023-01-23 15:02:36 +01:00
parent a8716fcb61
commit 8f3ca36e23
2 changed files with 4 additions and 1 deletions

View File

@ -406,6 +406,7 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;
LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
if (replicationStream != null && lsn != null) {
if (!lsnFlushingAllowed) {
LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);

View File

@ -188,7 +188,6 @@ protected void logStatistics(final List<SourceRecord> records) {
}
SourceRecord lastRecord = records.get(batchSize - 1);
updateLastOffset(lastRecord.sourcePartition(), lastRecord.sourceOffset());
previousOutputBatchSize += batchSize;
if (pollOutputDelay.hasElapsed()) {
// We want to record the status ...
@ -313,6 +312,8 @@ private void stop(boolean restart) {
@Override
public void commitRecord(SourceRecord record) throws InterruptedException {
LOGGER.trace("Committing record {}", record);
Map<String, ?> currentOffset = record.sourceOffset();
if (currentOffset != null) {
updateLastOffset(record.sourcePartition(), currentOffset);
@ -331,6 +332,7 @@ public void commit() throws InterruptedException {
Map<String, ?> partition = iterator.next();
Map<String, ?> lastOffset = lastOffsets.get(partition);
LOGGER.debug("Committing offset '{}' for partition '{}'", partition, lastOffset);
coordinator.commitOffset(partition, lastOffset);
iterator.remove();
}