DBZ-516 Returning control also from BlockingReader

This commit is contained in:
Gunnar Morling 2018-01-18 14:03:07 +01:00 committed by Jiri Pechanec
parent 0c4190c493
commit 7a7d43c237

View File

@ -6,13 +6,16 @@
package io.debezium.connector.mysql; package io.debezium.connector.mysql;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.config.ConfigurationDefaults;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
/** /**
* A component that blocks doing nothing until the connector task is stopped * A component that blocks doing nothing until the connector task is stopped
* *
@ -23,22 +26,27 @@ public class BlockingReader implements Reader {
protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Logger logger = LoggerFactory.getLogger(getClass());
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>(); private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final AtomicReference<State> state = new AtomicReference<>(); private final AtomicReference<State> state = new AtomicReference<>();
private final Metronome metronome;
private final CountDownLatch latch = new CountDownLatch(1);
private final String name; private final String name;
public BlockingReader(String name) { public BlockingReader(String name) {
this.name = name; this.name = name;
this.metronome = Metronome.parker(ConfigurationDefaults.RETURN_CONTROL_INTERVAL, Clock.SYSTEM);
} }
/** /**
* Waits indefinitely until the connector task is shut down * Does nothing until the connector task is shut down, but regularly returns control back to Connect in order for being paused if requested.
*/ */
@Override @Override
public List<SourceRecord> poll() throws InterruptedException { public List<SourceRecord> poll() throws InterruptedException {
latch.await(); if (state.get() == State.STOPPED) {
state.set(State.STOPPING); return null;
}
metronome.pause();
state.compareAndSet(State.RUNNING, State.STOPPING);
return null; return null;
} }
@ -62,13 +70,8 @@ public void start() {
@Override @Override
public void stop() { public void stop() {
try {
try { try {
state.set(State.STOPPED); state.set(State.STOPPED);
}
finally {
latch.countDown();
}
// Cleanup Resources // Cleanup Resources
Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once Runnable completionHandler = uponCompletion.getAndSet(null); // set to null so that we call it only once