diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java index 4f106f5d4..b9145338b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/RecordsStreamProducer.java @@ -15,6 +15,7 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -40,7 +41,9 @@ import io.debezium.relational.TableEditor; import io.debezium.relational.TableId; import io.debezium.relational.TableSchema; +import io.debezium.util.Clock; import io.debezium.util.LoggingContext; +import io.debezium.util.Metronome; import io.debezium.util.Strings; import io.debezium.util.Threads; @@ -68,6 +71,9 @@ public class RecordsStreamProducer extends RecordsProducer { private PgConnection typeResolverConnection = null; private Long lastCompletelyProcessedLsn; + private final AtomicLong lastCommittedLsn = new AtomicLong(-1); + private final Metronome pauseNoMessage; + private final Heartbeat heartbeat; @FunctionalInterface @@ -94,6 +100,7 @@ public RecordsStreamProducer(PostgresTaskContext taskContext, heartbeat = Heartbeat.create(taskContext.config().getConfig(), taskContext.topicSelector().getHeartbeatTopic(), taskContext.config().getLogicalName()); + pauseNoMessage = Metronome.sleeper(taskContext.getConfig().getPollInterval(), Clock.SYSTEM); } @Override @@ -142,9 +149,13 @@ private void streamChanges(BlockingConsumer consumer, Consumer process(x, stream.lastReceivedLsn(), consumer)); - } catch (SQLException e) { + if (!stream.readPending(x -> process(x, stream.lastReceivedLsn(), consumer))) { + pauseNoMessage.pause(); + } + } + catch (SQLException e) { Throwable cause = e.getCause(); if (cause != null && (cause instanceof IOException)) { //TODO author=Horia Chiorean date=08/11/2016 description=this is because we can't safely close the stream atm @@ -154,7 +165,12 @@ private void streamChanges(BlockingConsumer consumer, Consumer consumer, Consumer= stream.getLastReceiveLSN().asLong()) { + return false; + } + deserializeMessages(read, processor); + return true; + } + private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException { lastReceivedLsn = stream.getLastReceiveLSN(); messageDecoder.processMessage(buffer, processor, typeRegistry); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java index 4d43cb2d6..c73b5e3cf 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/ReplicationStream.java @@ -33,6 +33,18 @@ public interface ReplicationMessageProcessor { */ void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException; + /** + * Attempts to read a replication message from a replication connection, returning that message if it's available or returning + * {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLsn() last received LSN} + * will also be updated accordingly. + * + * @param processor - a callback to which the arrived message is passed + * @return {@code true} if a message was received and processed + * @throws SQLException if anything unexpected fails + * @see PGReplicationStream#readPending() + */ + boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException; + /** * Sends a message to the server informing it about that latest position in the WAL that has successfully been * processed. Due to the internal buffering the messages sent to Kafka (and thus committed offsets) will usually