diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 9918d87a4..d54e240ca 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -121,14 +121,12 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio final Long lsn = stream.lastReceivedLsn(); if (message == null) { LOGGER.trace("Received empty message"); - lastCompletelyProcessedLsn = lsn; - offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, null, null, null, taskContext.getSlotXmin(connection)); - maybeWarnAboutGrowingWalBacklog(false); - dispatcher.dispatchHeartbeatEvent(offsetContext); + skipMessage(lsn); return; } if (message.isTransactionalMessage() && !connectorConfig.shouldProvideTransactionMetadata()) { LOGGER.trace("Received transactional message {}", message); + skipMessage(lsn); return; } if (message.isLastEventForLsn()) { @@ -205,6 +203,13 @@ else if (message.getOperation() == Operation.COMMIT) { } } + private void skipMessage(final Long lsn) throws SQLException, InterruptedException { + lastCompletelyProcessedLsn = lsn; + offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, null, null, null, taskContext.getSlotXmin(connection)); + maybeWarnAboutGrowingWalBacklog(false); + dispatcher.dispatchHeartbeatEvent(offsetContext); + } + /** * If we receive change events but all of them get filtered out, we cannot * commit any new offset with Apache Kafka. This in turn means no LSN is ever