DBZ-1400 Record with same flush LSN should not be skipped for the first record

This commit is contained in:
Jiri Pechanec 2019-07-22 13:15:47 +02:00 committed by Chris Cranford
parent 00843cb0fb
commit 6a13b0e0b5

View File

@ -192,11 +192,13 @@ public ReplicationStream startStreaming() throws SQLException, InterruptedExcept
@Override
public ReplicationStream startStreaming(Long offset) throws SQLException, InterruptedException {
boolean skipFirstFlushRecord = true;
initConnection();
connect();
if (offset == null || offset <= 0) {
offset = defaultStartingPos;
skipFirstFlushRecord = false;
}
LogSequenceNumber lsn = LogSequenceNumber.valueOf(offset);
if (LOGGER.isDebugEnabled()) {
@ -204,7 +206,7 @@ public ReplicationStream startStreaming(Long offset) throws SQLException, Interr
}
try {
return createReplicationStream(lsn);
return createReplicationStream(lsn, skipFirstFlushRecord);
}
catch(Exception e) {
throw new ConnectException("Failed to start replication stream at " + lsn, e);
@ -282,7 +284,7 @@ private SlotCreationResult parseSlotCreation(ResultSet rs) {
}
private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException, InterruptedException {
private ReplicationStream createReplicationStream(final LogSequenceNumber lsn, boolean skipFirstFlushRecord) throws SQLException, InterruptedException {
PGReplicationStream s;
try {
@ -348,8 +350,12 @@ else if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already
@Override
public void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
ByteBuffer read = stream.read();
final long lastReceiveLsn = stream.getLastReceiveLSN().asLong();
LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn);
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (startingLsn >= stream.getLastReceiveLSN().asLong()) {
// but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot
if ((startingLsn > stream.getLastReceiveLSN().asLong()) || (startingLsn == stream.getLastReceiveLSN().asLong() && skipFirstFlushRecord)) {
LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startingLsn, lastReceiveLsn);
return;
}
deserializeMessages(read, processor);
@ -358,8 +364,15 @@ public void read(ReplicationMessageProcessor processor) throws SQLException, Int
@Override
public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
ByteBuffer read = stream.readPending();
final long lastReceiveLsn = stream.getLastReceiveLSN().asLong();
if (read == null) {
return false;
}
LOGGER.trace("Streaming requested from LSN {}, received LSN {}", startingLsn, lastReceiveLsn);
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (read == null || startingLsn >= stream.getLastReceiveLSN().asLong()) {
// but for the first record seen ever it is possible we received the same LSN as the one obtained from replication slot
if ((startingLsn > stream.getLastReceiveLSN().asLong()) || (startingLsn == stream.getLastReceiveLSN().asLong() && skipFirstFlushRecord)) {
LOGGER.info("Streaming requested from LSN {} but received LSN {} that is same or smaller so skipping the message", startingLsn, lastReceiveLsn);
return false;
}
deserializeMessages(read, processor);