DBZ-1214 Misc. clean-up:

* Using parker instead of Thread#sleep()
* Using flag for executor cancelation
* Using unique context name in thread name
This commit is contained in:
Gunnar Morling 2019-05-02 08:16:03 +02:00 committed by Jiri Pechanec
parent e1570dd330
commit 6960c880ca
2 changed files with 19 additions and 8 deletions

View File

@ -113,7 +113,7 @@ protected synchronized void start(BlockingConsumer<ChangeEvent> eventConsumer, C
// for large databases with many tables, we can timeout the slot while refreshing schema
// so we need to start a background thread that just responds to keep alive
replicationStream.get().startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, taskContext.config().getLogicalName(), CONTEXT_NAME));
replicationStream.get().startKeepAlive(Threads.newSingleThreadExecutor(PostgresConnector.class, taskContext.config().getLogicalName(), CONTEXT_NAME + "-keep-alive"));
// refresh the schema so we have a latest view of the DB tables
taskContext.refreshSchema(true);
@ -192,7 +192,7 @@ protected synchronized void stop() {
}
ReplicationStream stream = this.replicationStream.get();
// if we have a stream, ensure that it hs been stopped
// if we have a stream, ensure that it has been stopped
if (stream != null) {
stream.stopKeepAlive();
}

View File

@ -6,6 +6,8 @@
package io.debezium.connector.postgresql.connection;
import static java.lang.Math.toIntExact;
import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -15,9 +17,9 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static java.lang.Math.toIntExact;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.jdbc.PgConnection;
@ -33,6 +35,8 @@
import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcConnectionException;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
/**
* Implementation of a {@link ReplicationConnection} for Postgresql. Note that replication connections in PG cannot execute
@ -41,6 +45,7 @@
* @author Horia Chiorean (hchiorea@redhat.com)
*/
public class PostgresReplicationConnection extends JdbcConnection implements ReplicationConnection {
private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);
private final String slotName;
@ -61,7 +66,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
* @param slotName the name of the DB slot for logical replication; may not be null
* @param plugin decoder matching the server side plug-in used for streaming changes; may not be null
* @param dropSlotOnClose whether the replication slot should be dropped once the connection is closed
* @param statusUpdateInterval the number of milli-seconds at which the replication connection should periodically send status
* @param statusUpdateInterval the interval at which the replication connection should periodically send status
* @param typeRegistry registry with PostgreSQL types
*
* updates to the server
@ -242,10 +247,14 @@ else if (e.getMessage().matches("(?s)ERROR: requested WAL segment .* has already
final PGReplicationStream stream = s;
final long lsnLong = lsn.asLong();
return new ReplicationStream() {
private static final int CHECK_WARNINGS_AFTER_COUNT = 100;
private int warningCheckCounter = CHECK_WARNINGS_AFTER_COUNT;
private ExecutorService keepAliveExecutor = null;
private AtomicBoolean keepAliveRunning;
private final Metronome metronome = Metronome.sleeper(statusUpdateInterval, Clock.SYSTEM);
// make sure this is volatile since multiple threads may be interested in this value
private volatile LogSequenceNumber lastReceivedLsn;
@ -312,11 +321,14 @@ public Long lastReceivedLsn() {
public void startKeepAlive(ExecutorService service) {
if (keepAliveExecutor == null) {
keepAliveExecutor = service;
keepAliveRunning = new AtomicBoolean(true);
keepAliveExecutor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
while (keepAliveRunning.get()) {
try {
LOGGER.trace("Forcing status update with replication stream");
stream.forceUpdateStatus();
Thread.sleep(statusUpdateInterval.toMillis());
metronome.pause();
}
catch (Exception exp) {
throw new RuntimeException("received unexpected exception will perform keep alive", exp);
@ -324,13 +336,12 @@ public void startKeepAlive(ExecutorService service) {
}
});
}
}
@Override
public void stopKeepAlive() {
if (keepAliveExecutor != null) {
keepAliveRunning.set(false);
keepAliveExecutor.shutdownNow();
keepAliveExecutor = null;
}