From 3e288b8385e166045b6271cac9766cf5c08738e4 Mon Sep 17 00:00:00 2001 From: Oskar Polak Date: Fri, 17 Jun 2022 14:31:01 +0200 Subject: [PATCH] DBZ-5031 Additional logging when offset flushing is not allowed --- .../postgresql/PostgresStreamingChangeEventSource.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index afeb366b8..df5f40c22 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -397,16 +397,17 @@ private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) { @Override public void commitOffset(Map offset) { try { - if (!lsnFlushingAllowed) { - return; - } - ReplicationStream replicationStream = this.replicationStream.get(); final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)); final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; if (replicationStream != null && lsn != null) { + if (!lsnFlushingAllowed) { + LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); + return; + } + if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); }