diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/DebeziumEngineTestUtils.java b/debezium-embedded/src/test/java/io/debezium/embedded/DebeziumEngineTestUtils.java new file mode 100644 index 000000000..f9373bf02 --- /dev/null +++ b/debezium-embedded/src/test/java/io/debezium/embedded/DebeziumEngineTestUtils.java @@ -0,0 +1,132 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.embedded; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.StandardOpenOption; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.util.Callback; + +import io.debezium.connector.simple.SimpleSourceConnector; +import io.debezium.util.Collect; + +/** + * Utility classes and functions useful for testing implementations of {@link DebeziumEngine}. + */ +public class DebeziumEngineTestUtils { + + public static int appendLinesToSource(File inputFile, int numberOfLines, int initLineNumber) throws IOException { + CharSequence[] lines = new CharSequence[numberOfLines]; + for (int i = 0; i != numberOfLines; ++i) { + lines[i] = generateLine(initLineNumber + i + 1); + } + java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF_8, StandardOpenOption.APPEND); + return numberOfLines; + } + + public static int appendLinesToSource(File inputFile, String linePrefix, int numberOfLines, int initLineNumber) throws IOException { + CharSequence[] lines = new CharSequence[numberOfLines]; + for (int i = 0; i != numberOfLines; ++i) { + lines[i] = generateLine(linePrefix, initLineNumber + i + 1); + } + java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF_8, StandardOpenOption.APPEND); + return numberOfLines; + } + + public static String generateLine(int lineNumber) { + return generateLine("Generated line number ", lineNumber); + } + + public static String generateLine(String linePrefix, int lineNumber) { + return linePrefix + lineNumber; + } +} + +class InterruptedConnector extends SimpleSourceConnector { + + @Override + public Class taskClass() { + return InterruptedTask.class; + } +} + +class InterruptedTask extends SimpleSourceConnector.SimpleConnectorTask { + + @Override + public List poll() throws InterruptedException { + throw new InterruptedException(); + } +} + +class InterruptingOffsetStore implements OffsetBackingStore { + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public Future> get(Collection collection) { + // called by the offset reader. return null for no offsets stored. + return new CompletableFuture>() { + @Override + public Map get(long timeout, TimeUnit unit) { + return new HashMap(); + } + + @Override + public Map get() { + return new HashMap(); + } + }; + } + + /** + * Implementation that throws InterruptedException when offset commits are called. + */ + @Override + public Future set(Map map, Callback callback) { + return new CompletableFuture() { + @Override + public Void get() throws InterruptedException { + throw new InterruptedException(); + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException { + throw new InterruptedException(); + } + }; + } + + @Override + public void configure(WorkerConfig workerConfig) { + } + + @Override + public Set> connectorPartitions(String connectorName) { + return null; + } +} diff --git a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java index cc1596b56..30b90ab9a 100644 --- a/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java +++ b/debezium-embedded/src/test/java/io/debezium/embedded/EmbeddedEngineTest.java @@ -13,36 +13,27 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.file.FileStreamSourceConnector; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.json.JsonDeserializer; -import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.source.SourceRecord; -import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; -import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.SafeObjectInputStream; import org.junit.Before; import org.junit.Test; @@ -62,7 +53,6 @@ import io.debezium.engine.format.Json; import io.debezium.engine.format.JsonByteArray; import io.debezium.engine.spi.OffsetCommitPolicy; -import io.debezium.util.Collect; import io.debezium.util.LoggingContext; import io.debezium.util.Testing; import io.debezium.util.Throwables; @@ -143,7 +133,7 @@ public void beforeEach() throws Exception { @Test public void verifyNonAsciiContentHandledCorrectly() throws Exception { - appendLinesToSource("Ñ ñ", NUMBER_OF_LINES); + linesAdded += DebeziumEngineTestUtils.appendLinesToSource(inputFile, "Ñ ñ", NUMBER_OF_LINES, linesAdded); final Properties props = new Properties(); props.setProperty("name", "debezium-engine"); @@ -829,35 +819,13 @@ public void connectorStarted() { } protected void appendLinesToSource(int numberOfLines) throws IOException { - CharSequence[] lines = new CharSequence[numberOfLines]; - for (int i = 0; i != numberOfLines; ++i) { - lines[i] = generateLine(linesAdded + i + 1); - } - java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); - linesAdded += numberOfLines; - } - - protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException { - CharSequence[] lines = new CharSequence[numberOfLines]; - for (int i = 0; i != numberOfLines; ++i) { - lines[i] = generateLine(linePrefix, linesAdded + i + 1); - } - java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND); - linesAdded += numberOfLines; - } - - protected String generateLine(int lineNumber) { - return generateLine("Generated line number ", lineNumber); - } - - protected String generateLine(String linePrefix, int lineNumber) { - return linePrefix + lineNumber; + linesAdded += DebeziumEngineTestUtils.appendLinesToSource(inputFile, numberOfLines, linesAdded); } protected void consumeLines(int numberOfLines) throws InterruptedException { consumeRecords(numberOfLines, 3, record -> { String line = record.value().toString(); - assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber)); + assertThat(line).isEqualTo(DebeziumEngineTestUtils.generateLine(nextConsumedLineNumber)); ++nextConsumedLineNumber; }, false); @@ -890,73 +858,3 @@ public void close() { } } } - -class InterruptedConnector extends SimpleSourceConnector { - - @Override - public Class taskClass() { - return InterruptedTask.class; - } -} - -class InterruptedTask extends SimpleSourceConnector.SimpleConnectorTask { - - @Override - public List poll() throws InterruptedException { - throw new InterruptedException(); - } -} - -class InterruptingOffsetStore implements OffsetBackingStore { - - @Override - public void start() { - } - - @Override - public void stop() { - } - - @Override - public Future> get(Collection collection) { - // called by the offset reader. return null for no offsets stored. - return new CompletableFuture>() { - @Override - public Map get(long timeout, TimeUnit unit) { - return new HashMap(); - } - - @Override - public Map get() { - return new HashMap(); - } - }; - } - - /** - * Implementation that throws InterruptedException when offset commits are called. - */ - @Override - public Future set(Map map, Callback callback) { - return new CompletableFuture() { - @Override - public Void get() throws InterruptedException { - throw new InterruptedException(); - } - - @Override - public Void get(long timeout, TimeUnit unit) throws InterruptedException { - throw new InterruptedException(); - } - }; - } - - @Override - public void configure(WorkerConfig workerConfig) { - } - - @Override - public Set> connectorPartitions(String connectorName) { - return null; - } -}