diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java index a0bdb1b9c..50f3087ca 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.java @@ -17,10 +17,13 @@ import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; +import java.time.Duration; import java.time.Instant; import java.util.BitSet; +import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -73,6 +76,7 @@ import io.debezium.relational.TableId; import io.debezium.util.Clock; import io.debezium.util.ElapsedTimeStrategy; +import io.debezium.util.Metronome; import io.debezium.util.Strings; import io.debezium.util.Threads; @@ -86,6 +90,7 @@ public class BinlogReader extends AbstractReader { private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5); private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1); + private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive"; private final boolean recordSchemaChangesInSourceRecords; private final RecordMakers recordMakers; @@ -111,6 +116,7 @@ public class BinlogReader extends AbstractReader { private Heartbeat heartbeat; private MySqlJdbcContext connectionContext; private final float heartbeatIntervalFactor = 0.8f; + private final Set binaryLogClientThreads = Collections.synchronizedSet(new HashSet<>(2)); public static class BinlogPosition { final String filename; @@ -203,7 +209,8 @@ public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acce // Set up the log reader ... client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password()); // BinaryLogClient will overwrite thread names later - client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false, false)); + client.setThreadFactory( + Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false, false, binaryLogClientThreads::add)); client.setServerId(serverId); client.setSSLMode(sslModeFor(connectionContext.sslMode())); if (connectionContext.sslModeEnabled()) { @@ -398,6 +405,24 @@ protected void doStart() { try { logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout); client.connect(timeout); + // Need to wait for keepalive thread to be running, otherwise it can be left orphaned + // The problem is with timing. When the close is called too early after connect then + // the keepalive thread is not terminated + if (client.isKeepAlive()) { + logger.info("Waiting for keepalive thread to start"); + final Metronome metronome = Metronome.parker(Duration.ofMillis(100), clock); + int waitAttempts = 50; + boolean keepAliveThreadRunning = false; + while (!keepAliveThreadRunning && waitAttempts-- > 0) { + for (Thread t : binaryLogClientThreads) { + if (t.getName().startsWith(KEEPALIVE_THREAD_NAME) && t.isAlive()) { + logger.info("Keepalive thread is running"); + keepAliveThreadRunning = true; + } + } + metronome.pause(); + } + } } catch (TimeoutException e) { // If the client thread is interrupted *before* the client could connect, the client throws a timeout exception diff --git a/debezium-core/src/main/java/io/debezium/util/Threads.java b/debezium-core/src/main/java/io/debezium/util/Threads.java index 2d545f4e0..fb707bd79 100644 --- a/debezium-core/src/main/java/io/debezium/util/Threads.java +++ b/debezium-core/src/main/java/io/debezium/util/Threads.java @@ -12,6 +12,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.function.LongSupplier; import org.apache.kafka.connect.source.SourceConnector; @@ -248,6 +249,23 @@ private Threads() { * @return the thread factory setting the correct name */ public static ThreadFactory threadFactory(Class connector, String connectorId, String name, boolean indexed, boolean daemon) { + return threadFactory(connector, connectorId, name, indexed, daemon, null); + } + + /** + * Returns a thread factory that creates threads conforming to Debezium thread naming + * pattern {@code debezium---}. + * + * @param connector - the source connector class + * @param connectorId - the identifier to differentiate between connector instances + * @param name - the name of the thread + * @param indexed - true if the thread name should be appended with an index + * @param daemon - true if the thread should be a daemon thread + * @param callback - a callback called on every thread created + * @return the thread factory setting the correct name + */ + public static ThreadFactory threadFactory(Class connector, String connectorId, String name, boolean indexed, boolean daemon, + Consumer callback) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Requested thread factory for connector {}, id = {} named = {}", connector.getSimpleName(), connectorId, name); } @@ -269,6 +287,9 @@ public Thread newThread(Runnable r) { LOGGER.info("Creating thread {}", threadName); final Thread t = new Thread(r, threadName.toString()); t.setDaemon(daemon); + if (callback != null) { + callback.accept(t); + } return t; } }; diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java index cec25a08d..a18cf4f98 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/AbstractConnectorTest.java @@ -142,6 +142,7 @@ public void stopConnector(BooleanConsumer callback) { logger.info("Stopping the connector"); // Try to stop the connector ... if (engine != null && engine.isRunning()) { + logger.info("Stopping the engine"); engine.stop(); try { // Oracle connector needs longer time to complete shutdown @@ -153,6 +154,7 @@ public void stopConnector(BooleanConsumer callback) { } } if (executor != null) { + logger.info("Interrupting the engine"); List neverRunTasks = executor.shutdownNow(); assertThat(neverRunTasks).isEmpty(); try { @@ -166,6 +168,7 @@ public void stopConnector(BooleanConsumer callback) { } } if (engine != null && engine.isRunning()) { + logger.info("Waiting for engine to stop"); try { while (!engine.await(60, TimeUnit.SECONDS)) { // Wait for connector to stop completely ...