DBZ-2221 Wait for keepalive thread initialization
This commit is contained in:
parent
8a4a9dfaa6
commit
87d93d44fd
@ -17,10 +17,13 @@
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumMap;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -73,6 +76,7 @@
|
||||
import io.debezium.relational.TableId;
|
||||
import io.debezium.util.Clock;
|
||||
import io.debezium.util.ElapsedTimeStrategy;
|
||||
import io.debezium.util.Metronome;
|
||||
import io.debezium.util.Strings;
|
||||
import io.debezium.util.Threads;
|
||||
|
||||
@ -86,6 +90,7 @@ public class BinlogReader extends AbstractReader {
|
||||
|
||||
private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5);
|
||||
private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1);
|
||||
private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
|
||||
|
||||
private final boolean recordSchemaChangesInSourceRecords;
|
||||
private final RecordMakers recordMakers;
|
||||
@ -111,6 +116,7 @@ public class BinlogReader extends AbstractReader {
|
||||
private Heartbeat heartbeat;
|
||||
private MySqlJdbcContext connectionContext;
|
||||
private final float heartbeatIntervalFactor = 0.8f;
|
||||
private final Set<Thread> binaryLogClientThreads = Collections.synchronizedSet(new HashSet<>(2));
|
||||
|
||||
public static class BinlogPosition {
|
||||
final String filename;
|
||||
@ -203,7 +209,8 @@ public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acce
|
||||
// Set up the log reader ...
|
||||
client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password());
|
||||
// BinaryLogClient will overwrite thread names later
|
||||
client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false, false));
|
||||
client.setThreadFactory(
|
||||
Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false, false, binaryLogClientThreads::add));
|
||||
client.setServerId(serverId);
|
||||
client.setSSLMode(sslModeFor(connectionContext.sslMode()));
|
||||
if (connectionContext.sslModeEnabled()) {
|
||||
@ -398,6 +405,24 @@ protected void doStart() {
|
||||
try {
|
||||
logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
|
||||
client.connect(timeout);
|
||||
// Need to wait for keepalive thread to be running, otherwise it can be left orphaned
|
||||
// The problem is with timing. When the close is called too early after connect then
|
||||
// the keepalive thread is not terminated
|
||||
if (client.isKeepAlive()) {
|
||||
logger.info("Waiting for keepalive thread to start");
|
||||
final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock);
|
||||
int waitAttempts = 50;
|
||||
boolean keepAliveThreadRunning = false;
|
||||
while (!keepAliveThreadRunning && waitAttempts-- > 0) {
|
||||
for (Thread t : binaryLogClientThreads) {
|
||||
if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) {
|
||||
logger.info("Keepalive thread is running");
|
||||
keepAliveThreadRunning = true;
|
||||
}
|
||||
}
|
||||
metronome.pause();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (TimeoutException e) {
|
||||
// If the client thread is interrupted *before* the client could connect, the client throws a timeout exception
|
||||
|
@ -12,6 +12,7 @@
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import org.apache.kafka.connect.source.SourceConnector;
|
||||
@ -248,6 +249,23 @@ private Threads() {
|
||||
* @return the thread factory setting the correct name
|
||||
*/
|
||||
public static ThreadFactory threadFactory(Class<? extends SourceConnector> connector, String connectorId, String name, boolean indexed, boolean daemon) {
|
||||
return threadFactory(connector, connectorId, name, indexed, daemon, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a thread factory that creates threads conforming to Debezium thread naming
|
||||
* pattern {@code debezium-<connector class>-<connector-id>-<thread-name>}.
|
||||
*
|
||||
* @param connector - the source connector class
|
||||
* @param connectorId - the identifier to differentiate between connector instances
|
||||
* @param name - the name of the thread
|
||||
* @param indexed - true if the thread name should be appended with an index
|
||||
* @param daemon - true if the thread should be a daemon thread
|
||||
* @param callback - a callback called on every thread created
|
||||
* @return the thread factory setting the correct name
|
||||
*/
|
||||
public static ThreadFactory threadFactory(Class<? extends SourceConnector> connector, String connectorId, String name, boolean indexed, boolean daemon,
|
||||
Consumer<Thread> callback) {
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("Requested thread factory for connector {}, id = {} named = {}", connector.getSimpleName(), connectorId, name);
|
||||
}
|
||||
@ -269,6 +287,9 @@ public Thread newThread(Runnable r) {
|
||||
LOGGER.info("Creating thread {}", threadName);
|
||||
final Thread t = new Thread(r, threadName.toString());
|
||||
t.setDaemon(daemon);
|
||||
if (callback != null) {
|
||||
callback.accept(t);
|
||||
}
|
||||
return t;
|
||||
}
|
||||
};
|
||||
|
@ -142,6 +142,7 @@ public void stopConnector(BooleanConsumer callback) {
|
||||
logger.info("Stopping the connector");
|
||||
// Try to stop the connector ...
|
||||
if (engine != null && engine.isRunning()) {
|
||||
logger.info("Stopping the engine");
|
||||
engine.stop();
|
||||
try {
|
||||
// Oracle connector needs longer time to complete shutdown
|
||||
@ -153,6 +154,7 @@ public void stopConnector(BooleanConsumer callback) {
|
||||
}
|
||||
}
|
||||
if (executor != null) {
|
||||
logger.info("Interrupting the engine");
|
||||
List<Runnable> neverRunTasks = executor.shutdownNow();
|
||||
assertThat(neverRunTasks).isEmpty();
|
||||
try {
|
||||
@ -166,6 +168,7 @@ public void stopConnector(BooleanConsumer callback) {
|
||||
}
|
||||
}
|
||||
if (engine != null && engine.isRunning()) {
|
||||
logger.info("Waiting for engine to stop");
|
||||
try {
|
||||
while (!engine.await(60, TimeUnit.SECONDS)) {
|
||||
// Wait for connector to stop completely ...
|
||||
|
Loading…
Reference in New Issue
Block a user