DBZ-583 Extracting a builder from ChainedReader, avoiding mutable state within its implementation

This commit is contained in:
Gunnar Morling 2018-01-29 19:13:16 +01:00 committed by Jiri Pechanec
parent 93cfb4f4cb
commit 018a879d40
3 changed files with 65 additions and 46 deletions

View File

@ -6,6 +6,7 @@
package io.debezium.connector.mysql;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -22,34 +23,54 @@
* This reader ensures that all records generated by one of its contained {@link Reader}s are all passed through to callers
* via {@link #poll() polling} before the next reader is started. And, when this reader is {@link #stop() stopped}, this
* class ensures that current reader is stopped and that no additional readers will be started.
*
*
* @author Randall Hauch
*/
@ThreadSafe
public final class ChainedReader implements Reader {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final List<Reader> readers = new ArrayList<>();
private final List<Reader> readers;
private final String completionMessage;
private final LinkedList<Reader> remainingReaders = new LinkedList<>();
private final AtomicBoolean running = new AtomicBoolean();
private final AtomicBoolean completed = new AtomicBoolean(true);
private final AtomicReference<Reader> currentReader = new AtomicReference<>();
private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>();
private final AtomicReference<String> completionMessage = new AtomicReference<>();
public static class Builder {
private final List<Reader> readers = new ArrayList<>();
private String completionMessage;
public Builder addReader(Reader reader) {
readers.add(reader);
return this;
}
/**
* Set the message that should be logged when all of the readers have completed their work.
*/
public Builder completionMessage(String message) {
this.completionMessage = message;
return this;
}
public ChainedReader build() {
return new ChainedReader(readers, completionMessage);
}
}
/**
* Create a new chained reader.
*/
public ChainedReader() {
}
private ChainedReader(List<Reader> readers, String completionMessage) {
this.readers = Collections.unmodifiableList(readers);
this.completionMessage = completionMessage;
/**
* Set the message that should be logged when all of the readers have completed their work.
*
* @param msg the message; may be null
*/
public void uponCompletion(String msg) {
completionMessage.set(msg);
for (Reader reader : this.readers) {
reader.uponCompletion(this::readerCompletedPolling);
}
}
@Override
@ -57,13 +78,6 @@ public void uponCompletion(Runnable handler) {
uponCompletion.set(handler);
}
protected ChainedReader add(Reader reader) {
assert reader != null;
reader.uponCompletion(this::readerCompletedPolling);
readers.add(reader);
return this;
}
@Override
public void initialize() {
// initialize all of the readers ...
@ -144,9 +158,8 @@ private synchronized void readerCompletedPolling() {
handler.run();
}
// and output our message ...
String msg = completionMessage.get();
if (msg != null) {
logger.info(msg);
if (completionMessage != null) {
logger.info(completionMessage);
}
}
} finally {
@ -159,7 +172,7 @@ private synchronized void readerCompletedPolling() {
/**
* Start the next reader.
*
*
* @return {@code true} if the next reader was started, or {@code false} if there are no more readers
*/
private boolean startNextReader() {

View File

@ -161,28 +161,28 @@ public synchronized void start(Map<String, String> props) {
// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();
ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder();
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
readers.add(snapshotReader);
chainedReaderBuilder.addReader(snapshotReader);
if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
readers.add(new BlockingReader("blocker"));
readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
chainedReaderBuilder.addReader(new BlockingReader("blocker"));
chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
readers.add(binlogReader);
chainedReaderBuilder.addReader(binlogReader);
}
} else {
if (!rowBinlogEnabled) {
@ -190,9 +190,12 @@ public synchronized void start(Map<String, String> props) {
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// We're going to start by reading the binlog ...
readers.add(binlogReader);
chainedReaderBuilder.addReader(binlogReader);
}
readers = chainedReaderBuilder.build();
readers.uponCompletion(this::completeReaders);
// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();

View File

@ -15,7 +15,6 @@
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import io.debezium.config.ConfigurationDefaults;
@ -52,13 +51,9 @@ private static SourceRecord record() {
private ChainedReader reader;
@Before
public void beforeEach() {
reader = new ChainedReader();
}
@Test
public void shouldNotStartWithoutReaders() throws InterruptedException {
reader = new ChainedReader.Builder().build();
assertThat(reader.state()).isEqualTo(State.STOPPED);
reader.start();
assertThat(reader.state()).isEqualTo(State.STOPPED);
@ -67,8 +62,10 @@ public void shouldNotStartWithoutReaders() throws InterruptedException {
@Test
public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws InterruptedException {
reader.add(new MockReader("r1", records()));
reader.uponCompletion("Stopped the r1 reader");
reader = new ChainedReader.Builder()
.addReader(new MockReader("r1", records()))
.completionMessage("Stopped the r1 reader")
.build();
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
@ -84,8 +81,10 @@ public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws Inter
@Test
public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedException {
reader.add(new MockReader("r2", records()));
reader.uponCompletion("Stopped the r2 reader");
reader = new ChainedReader.Builder()
.addReader(new MockReader("r2", records()))
.completionMessage("Stopped the r2 reader")
.build();
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
@ -100,9 +99,11 @@ public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedEx
@Test
public void shouldStartAndStopMultipleReaders() throws InterruptedException {
reader.add(new MockReader("r3", records()));
reader.add(new MockReader("r4", records()));
reader.uponCompletion("Stopped the r3+r4 reader");
reader = new ChainedReader.Builder()
.addReader(new MockReader("r3", records()))
.addReader(new MockReader("r4", records()))
.completionMessage("Stopped the r3+r4 reader")
.build();
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
@ -132,8 +133,10 @@ public void shouldStartAndStopMultipleReaders() throws InterruptedException {
@Test
public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingStopped() throws InterruptedException {
reader.add(new CompletingMockReader("r5", records()));
reader.uponCompletion("Stopped the r5 reader");
reader = new ChainedReader.Builder()
.addReader(new CompletingMockReader("r5", records()))
.completionMessage("Stopped the r5 reader")
.build();
reader.start();
assertThat(reader.state()).isEqualTo(State.RUNNING);
assertThat(reader.poll()).isSameAs(RL1);
@ -150,7 +153,7 @@ public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingSt
assertThat(reader.state()).isEqualTo(State.STOPPED);
assertPollReturnsNoMoreRecords();
}
protected void assertPollReturnsNoMoreRecords() throws InterruptedException {
for (int i=0;i!=10; ++i) {
assertThat(reader.poll()).isNull();