From 018a879d4023ea431a08f735a011e23b5c045e16 Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Mon, 29 Jan 2018 19:13:16 +0100 Subject: [PATCH] DBZ-583 Extracting a builder from ChainedReader, avoiding mutable state within its implementation --- .../connector/mysql/ChainedReader.java | 59 +++++++++++-------- .../connector/mysql/MySqlConnectorTask.java | 17 +++--- .../connector/mysql/ChainedReaderTest.java | 35 ++++++----- 3 files changed, 65 insertions(+), 46 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java index 3b891953b..b0a360e0d 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/ChainedReader.java @@ -6,6 +6,7 @@ package io.debezium.connector.mysql; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -22,34 +23,54 @@ * This reader ensures that all records generated by one of its contained {@link Reader}s are all passed through to callers * via {@link #poll() polling} before the next reader is started. And, when this reader is {@link #stop() stopped}, this * class ensures that current reader is stopped and that no additional readers will be started. - * + * * @author Randall Hauch */ @ThreadSafe public final class ChainedReader implements Reader { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final List readers = new ArrayList<>(); + private final List readers; + private final String completionMessage; private final LinkedList remainingReaders = new LinkedList<>(); private final AtomicBoolean running = new AtomicBoolean(); private final AtomicBoolean completed = new AtomicBoolean(true); private final AtomicReference currentReader = new AtomicReference<>(); private final AtomicReference uponCompletion = new AtomicReference<>(); - private final AtomicReference completionMessage = new AtomicReference<>(); + + public static class Builder { + + private final List readers = new ArrayList<>(); + private String completionMessage; + + public Builder addReader(Reader reader) { + readers.add(reader); + return this; + } + + /** + * Set the message that should be logged when all of the readers have completed their work. + */ + public Builder completionMessage(String message) { + this.completionMessage = message; + return this; + } + + public ChainedReader build() { + return new ChainedReader(readers, completionMessage); + } + } /** * Create a new chained reader. */ - public ChainedReader() { - } + private ChainedReader(List readers, String completionMessage) { + this.readers = Collections.unmodifiableList(readers); + this.completionMessage = completionMessage; - /** - * Set the message that should be logged when all of the readers have completed their work. - * - * @param msg the message; may be null - */ - public void uponCompletion(String msg) { - completionMessage.set(msg); + for (Reader reader : this.readers) { + reader.uponCompletion(this::readerCompletedPolling); + } } @Override @@ -57,13 +78,6 @@ public void uponCompletion(Runnable handler) { uponCompletion.set(handler); } - protected ChainedReader add(Reader reader) { - assert reader != null; - reader.uponCompletion(this::readerCompletedPolling); - readers.add(reader); - return this; - } - @Override public void initialize() { // initialize all of the readers ... @@ -144,9 +158,8 @@ private synchronized void readerCompletedPolling() { handler.run(); } // and output our message ... - String msg = completionMessage.get(); - if (msg != null) { - logger.info(msg); + if (completionMessage != null) { + logger.info(completionMessage); } } } finally { @@ -159,7 +172,7 @@ private synchronized void readerCompletedPolling() { /** * Start the next reader. - * + * * @return {@code true} if the next reader was started, or {@code false} if there are no more readers */ private boolean startNextReader() { diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java index 3b16c904f..e2c6efd49 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java @@ -161,28 +161,28 @@ public synchronized void start(Map props) { // Check whether the row-level binlog is enabled ... final boolean rowBinlogEnabled = isRowBinlogEnabled(); + ChainedReader.Builder chainedReaderBuilder = new ChainedReader.Builder(); + // Set up the readers, with a callback to `completeReaders` so that we know when it is finished ... - readers = new ChainedReader(); - readers.uponCompletion(this::completeReaders); BinlogReader binlogReader = new BinlogReader("binlog", taskContext); if (startWithSnapshot) { // We're supposed to start with a snapshot, so set that up ... SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext); snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking()); if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents(); - readers.add(snapshotReader); + chainedReaderBuilder.addReader(snapshotReader); if (taskContext.isInitialSnapshotOnly()) { logger.warn("This connector will only perform a snapshot, and will stop after that completes."); - readers.add(new BlockingReader("blocker")); - readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate."); + chainedReaderBuilder.addReader(new BlockingReader("blocker")); + chainedReaderBuilder.completionMessage("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate."); } else { if (!rowBinlogEnabled) { throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is " + "required for this connector to work properly. Change the MySQL configuration to use a " + "row-level binlog and restart the connector."); } - readers.add(binlogReader); + chainedReaderBuilder.addReader(binlogReader); } } else { if (!rowBinlogEnabled) { @@ -190,9 +190,12 @@ public synchronized void start(Map props) { "The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector."); } // We're going to start by reading the binlog ... - readers.add(binlogReader); + chainedReaderBuilder.addReader(binlogReader); } + readers = chainedReaderBuilder.build(); + readers.uponCompletion(this::completeReaders); + // And finally initialize and start the chain of readers ... this.readers.initialize(); this.readers.start(); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ChainedReaderTest.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ChainedReaderTest.java index 34e3627a3..d8ea41c38 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ChainedReaderTest.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/ChainedReaderTest.java @@ -15,7 +15,6 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import io.debezium.config.ConfigurationDefaults; @@ -52,13 +51,9 @@ private static SourceRecord record() { private ChainedReader reader; - @Before - public void beforeEach() { - reader = new ChainedReader(); - } - @Test public void shouldNotStartWithoutReaders() throws InterruptedException { + reader = new ChainedReader.Builder().build(); assertThat(reader.state()).isEqualTo(State.STOPPED); reader.start(); assertThat(reader.state()).isEqualTo(State.STOPPED); @@ -67,8 +62,10 @@ public void shouldNotStartWithoutReaders() throws InterruptedException { @Test public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws InterruptedException { - reader.add(new MockReader("r1", records())); - reader.uponCompletion("Stopped the r1 reader"); + reader = new ChainedReader.Builder() + .addReader(new MockReader("r1", records())) + .completionMessage("Stopped the r1 reader") + .build(); reader.start(); assertThat(reader.state()).isEqualTo(State.RUNNING); assertThat(reader.poll()).isSameAs(RL1); @@ -84,8 +81,10 @@ public void shouldStartAndStopSingleReaderBeforeReaderStopsItself() throws Inter @Test public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedException { - reader.add(new MockReader("r2", records())); - reader.uponCompletion("Stopped the r2 reader"); + reader = new ChainedReader.Builder() + .addReader(new MockReader("r2", records())) + .completionMessage("Stopped the r2 reader") + .build(); reader.start(); assertThat(reader.state()).isEqualTo(State.RUNNING); assertThat(reader.poll()).isSameAs(RL1); @@ -100,9 +99,11 @@ public void shouldStartSingleReaderThatStopsAutomatically() throws InterruptedEx @Test public void shouldStartAndStopMultipleReaders() throws InterruptedException { - reader.add(new MockReader("r3", records())); - reader.add(new MockReader("r4", records())); - reader.uponCompletion("Stopped the r3+r4 reader"); + reader = new ChainedReader.Builder() + .addReader(new MockReader("r3", records())) + .addReader(new MockReader("r4", records())) + .completionMessage("Stopped the r3+r4 reader") + .build(); reader.start(); assertThat(reader.state()).isEqualTo(State.RUNNING); assertThat(reader.poll()).isSameAs(RL1); @@ -132,8 +133,10 @@ public void shouldStartAndStopMultipleReaders() throws InterruptedException { @Test public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingStopped() throws InterruptedException { - reader.add(new CompletingMockReader("r5", records())); - reader.uponCompletion("Stopped the r5 reader"); + reader = new ChainedReader.Builder() + .addReader(new CompletingMockReader("r5", records())) + .completionMessage("Stopped the r5 reader") + .build(); reader.start(); assertThat(reader.state()).isEqualTo(State.RUNNING); assertThat(reader.poll()).isSameAs(RL1); @@ -150,7 +153,7 @@ public void shouldStartAndStopReaderThatContinuesProducingItsRecordsAfterBeingSt assertThat(reader.state()).isEqualTo(State.STOPPED); assertPollReturnsNoMoreRecords(); } - + protected void assertPollReturnsNoMoreRecords() throws InterruptedException { for (int i=0;i!=10; ++i) { assertThat(reader.poll()).isNull();