From 7a7d43c2371507b22bf2a6be8fce11023e5cc7b7 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Thu, 18 Jan 2018 14:03:07 +0100 Subject: [PATCH] DBZ-516 Returning control also from BlockingReader --- .../connector/mysql/BlockingReader.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java index 9b32764fd..11b7dd53b 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BlockingReader.java @@ -6,13 +6,16 @@ package io.debezium.connector.mysql; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.config.ConfigurationDefaults; +import io.debezium.util.Clock; +import io.debezium.util.Metronome; + /** * A component that blocks doing nothing until the connector task is stopped * @@ -23,22 +26,27 @@ public class BlockingReader implements Reader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final AtomicReference uponCompletion = new AtomicReference<>(); private final AtomicReference state = new AtomicReference<>(); - - private final CountDownLatch latch = new CountDownLatch(1); + private final Metronome metronome; private final String name; public BlockingReader(String name) { this.name = name; + this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM); + } /** - * Waits indefinitely until the connector task is shut down + * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested. */ @Override public List poll() throws InterruptedException { - latch.await(); - state.set(State.STOPPING); + if (state.get() == State.STOPPED) { + return null; + } + + metronome.pause(); + state.compareAndSet(State.RUNNING, State.STOPPING); return null; } @@ -63,12 +71,7 @@ public void start() { @Override public void stop() { try { - try { - state.set(State.STOPPED); - } - finally { - latch.countDown(); - } + state.set(State.STOPPED); // Cleanup Resources Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once