DBZ-1347 Switch to non-blocking stream read

This commit is contained in:
Jiri Pechanec 2019-06-21 13:07:02 +02:00 committed by Gunnar Morling
parent e05ef934a0
commit e72d7edd2f
3 changed files with 62 additions and 17 deletions

View File

@ -15,6 +15,7 @@
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -40,7 +41,9 @@
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
@ -68,6 +71,9 @@ public class RecordsStreamProducer extends RecordsProducer {
private PgConnection typeResolverConnection = null;
private Long lastCompletelyProcessedLsn;
private final AtomicLong lastCommittedLsn = new AtomicLong(-1);
private final Metronome pauseNoMessage;
private final Heartbeat heartbeat;
@FunctionalInterface
@ -94,6 +100,7 @@ public RecordsStreamProducer(PostgresTaskContext taskContext,
heartbeat = Heartbeat.create(taskContext.config().getConfig(), taskContext.topicSelector().getHeartbeatTopic(),
taskContext.config().getLogicalName());
pauseNoMessage = Metronome.sleeper(taskContext.getConfig().getPollInterval(), Clock.SYSTEM);
}
@Override
@ -142,9 +149,13 @@ private void streamChanges(BlockingConsumer<ChangeEvent> consumer, Consumer<Thro
// run while we haven't been requested to stop
while (!Thread.currentThread().isInterrupted()) {
try {
flushLatestCommittedLsn(stream);
// this will block until a message is available
stream.read(x -> process(x, stream.lastReceivedLsn(), consumer));
} catch (SQLException e) {
if (!stream.readPending(x -> process(x, stream.lastReceivedLsn(), consumer))) {
pauseNoMessage.pause();
}
}
catch (SQLException e) {
Throwable cause = e.getCause();
if (cause != null && (cause instanceof IOException)) {
//TODO author=Horia Chiorean date=08/11/2016 description=this is because we can't safely close the stream atm
@ -154,7 +165,12 @@ private void streamChanges(BlockingConsumer<ChangeEvent> consumer, Consumer<Thro
}
failureConsumer.accept(e);
throw new ConnectException(e);
} catch (Throwable e) {
}
catch (InterruptedException e) {
logger.info("Interrupted from sleep, producer termination requested");
Thread.currentThread().interrupt();
}
catch (Throwable e) {
logger.error("unexpected exception while streaming logical changes", e);
failureConsumer.accept(e);
throw new ConnectException(e);
@ -162,25 +178,25 @@ private void streamChanges(BlockingConsumer<ChangeEvent> consumer, Consumer<Thro
}
}
private void flushLatestCommittedLsn(ReplicationStream stream) throws SQLException {
final long newLsn = lastCommittedLsn.getAndSet(-1);
if (newLsn != -1) {
if (logger.isDebugEnabled()) {
logger.debug("Flushing LSN to server: {}", LogSequenceNumber.valueOf(newLsn));
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
stream.flushLsn(newLsn);
}
}
@Override
protected synchronized void commit(long lsn) {
LoggingContext.PreviousContext previousContext = taskContext.configureLoggingContext(CONTEXT_NAME);
try {
ReplicationStream replicationStream = this.replicationStream.get();
if (replicationStream != null) {
if (logger.isDebugEnabled()) {
logger.debug("Flushing LSN to server: {}", LogSequenceNumber.valueOf(lsn));
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
if (logger.isDebugEnabled()) {
logger.debug("Flushing of LSN '{}' requested", LogSequenceNumber.valueOf(lsn));
}
else {
logger.debug("Streaming has already stopped, ignoring commit callback...");
}
}
catch (SQLException e) {
throw new ConnectException(e);
lastCommittedLsn.set(lsn);
}
finally {
previousContext.restore();
@ -200,6 +216,12 @@ protected synchronized void stop() {
ReplicationStream stream = this.replicationStream.get();
// if we have a stream, ensure that it has been stopped
if (stream != null) {
try {
flushLatestCommittedLsn(stream);
}
catch (SQLException e) {
logger.error("Failed to execute the final LSN flush", e);
}
stream.stopKeepAlive();
}

View File

@ -269,6 +269,17 @@ public void read(ReplicationMessageProcessor processor) throws SQLException, Int
deserializeMessages(read, processor);
}
@Override
public boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
ByteBuffer read = stream.readPending();
// the lsn we started from is inclusive, so we need to avoid sending back the same message twice
if (read == null || lsnLong >= stream.getLastReceiveLSN().asLong()) {
return false;
}
deserializeMessages(read, processor);
return true;
}
private void deserializeMessages(ByteBuffer buffer, ReplicationMessageProcessor processor) throws SQLException, InterruptedException {
lastReceivedLsn = stream.getLastReceiveLSN();
messageDecoder.processMessage(buffer, processor, typeRegistry);

View File

@ -33,6 +33,18 @@ public interface ReplicationMessageProcessor {
*/
void read(ReplicationMessageProcessor processor) throws SQLException, InterruptedException;
/**
* Attempts to read a replication message from a replication connection, returning that message if it's available or returning
* {@code null} if nothing is available. Once a message has been received, the value of the {@link #lastReceivedLsn() last received LSN}
* will also be updated accordingly.
*
* @param processor - a callback to which the arrived message is passed
* @return {@code true} if a message was received and processed
* @throws SQLException if anything unexpected fails
* @see PGReplicationStream#readPending()
*/
boolean readPending(ReplicationMessageProcessor processor) throws SQLException, InterruptedException;
/**
* Sends a message to the server informing it about that latest position in the WAL that has successfully been
* processed. Due to the internal buffering the messages sent to Kafka (and thus committed offsets) will usually