diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index afeb366b8..df5f40c22 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -397,16 +397,17 @@ private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) { @Override public void commitOffset(Map offset) { try { - if (!lsnFlushingAllowed) { - return; - } - ReplicationStream replicationStream = this.replicationStream.get(); final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)); final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; if (replicationStream != null && lsn != null) { + if (!lsnFlushingAllowed) { + LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); + return; + } + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); }