diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java index 2940e71ae..e4b5fe12e 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.java @@ -98,9 +98,17 @@ public void start() { @Override public void stop() { try { + // Emptying the queue so to make sure that enqueue() won't block indefinitely when adding records after + // poll() isn't called anymore but before the binlog reader is stopped; note there's still a tiny chance for + // this to happen if enough records are added again between here and the call to disconnect(); protecting + // against it seems not worth though it as shouldn't happen for any practical queue size + List unsent = new ArrayList<>(); + records.drainTo(unsent); + logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size()); doStop(); running.set(false); - } finally { + } + finally { if (failure.get() != null) { // We had a failure and it was propagated via poll(), after which Kafka Connect will stop // the connector, which will stop the task that will then stop this reader via this method.