From 6a13b0e0b5aefe027d792ecf56fb731509cff307 Mon Sep 17 00:00:00 2001 From: Jiri Pechanec Date: Mon, 22 Jul 2019 13:15:47 +0200 Subject: [PATCH] DBZ-1400 Record with same flush LSN should not be skipped for the first record --- .../PostgresReplicationConnection.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index f082ae663..0a498d328 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -192,11 +192,13 @@ public ReplicationStream startStreaming() throws SQLException, InterruptedExcept @Override public ReplicationStream startStreaming(Long offset) throws SQLException, InterruptedException { + boolean skipFirstFlushRecord = true; initConnection(); connect(); if (offset == null || offset <= 0) { offset = defaultStartingPos; + skipFirstFlushRecord = false; } LogSequenceNumber lsn = LogSequenceNumber.valueOf(offset); if (LOGGER.isDebugEnabled()) { @@ -204,7 +206,7 @@ public ReplicationStream startStreaming(Long offset) throws SQLException, Interr } try { - return createReplicationStream(lsn); + return createReplicationStream(lsn, skipFirstFlushRecord); } catch(Exception e) { throw new ConnectException("Failed to start replication stream at " + lsn, e); @@ -282,7 +284,7 @@ private SlotCreationResult parseSlotCreation(ResultSet rs) { } - private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException, InterruptedException { + private ReplicationStream createReplicationStream(final LogSequenceNumber lsn, boolean skipFirstFlushRecord) throws SQLException, InterruptedException { PGReplicationStream s; try { @@ -348,8 +350,12 @@ else if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already @Override public void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.read(); + final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); + LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); // the lsn we started from is inclusive, so we need to avoid sending back the same message twice - if (startingLsn >= stream.getLastReceiveLSN().asLong()) { + // but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot + if ((startingLsn > stream.getLastReceiveLSN().asLong()) || (startingLsn == stream.getLastReceiveLSN().asLong() && skipFirstFlushRecord)) { + LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startingLsn, lastReceiveLsn); return; } deserializeMessages(read, processor); @@ -358,8 +364,15 @@ public void read(ReplicationMessageProcessor processor) throws SQLException, Int @Override public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException { ByteBuffer read = stream.readPending(); + final long lastReceiveLsn = stream.getLastReceiveLSN().asLong(); + if (read == null) { + return false; + } + LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn); // the lsn we started from is inclusive, so we need to avoid sending back the same message twice - if (read == null || startingLsn >= stream.getLastReceiveLSN().asLong()) { + // but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot + if ((startingLsn > stream.getLastReceiveLSN().asLong()) || (startingLsn == stream.getLastReceiveLSN().asLong() && skipFirstFlushRecord)) { + LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startingLsn, lastReceiveLsn); return false; } deserializeMessages(read, processor);