DBZ-7024 Move reuseable testing functions for DebeziumEngine into common util class

This commit is contained in:
Vojtech Juranek 2023-11-30 16:14:52 +01:00 committed by Jiri Pechanec
parent b8e16ee89f
commit 7eaf0fc288
2 changed files with 135 additions and 105 deletions

View File

@ -0,0 +1,132 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import io.debezium.connector.simple.SimpleSourceConnector;
import io.debezium.util.Collect;
/**
* Utility classes and functions useful for testing implementations of {@link DebeziumEngine}.
*/
public class DebeziumEngineTestUtils {
public static int appendLinesToSource(File inputFile, int numberOfLines, int initLineNumber) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(initLineNumber + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF_8, StandardOpenOption.APPEND);
return numberOfLines;
}
public static int appendLinesToSource(File inputFile, String linePrefix, int numberOfLines, int initLineNumber) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linePrefix, initLineNumber + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF_8, StandardOpenOption.APPEND);
return numberOfLines;
}
public static String generateLine(int lineNumber) {
return generateLine("Generated line number ", lineNumber);
}
public static String generateLine(String linePrefix, int lineNumber) {
return linePrefix + lineNumber;
}
}
class InterruptedConnector extends SimpleSourceConnector {
@Override
public Class<? extends Task> taskClass() {
return InterruptedTask.class;
}
}
class InterruptedTask extends SimpleSourceConnector.SimpleConnectorTask {
@Override
public List<SourceRecord> poll() throws InterruptedException {
throw new InterruptedException();
}
}
class InterruptingOffsetStore implements OffsetBackingStore {
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> collection) {
// called by the offset reader. return null for no offsets stored.
return new CompletableFuture<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit unit) {
return new HashMap<ByteBuffer, ByteBuffer>();
}
@Override
public Map<ByteBuffer, ByteBuffer> get() {
return new HashMap<ByteBuffer, ByteBuffer>();
}
};
}
/**
* Implementation that throws InterruptedException when offset commits are called.
*/
@Override
public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
return new CompletableFuture<Void>() {
@Override
public Void get() throws InterruptedException {
throw new InterruptedException();
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException {
throw new InterruptedException();
}
};
}
@Override
public void configure(WorkerConfig workerConfig) {
}
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName) {
return null;
}
}

View File

@ -13,36 +13,27 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.junit.Before;
import org.junit.Test;
@ -62,7 +53,6 @@
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.spi.OffsetCommitPolicy;
import io.debezium.util.Collect;
import io.debezium.util.LoggingContext;
import io.debezium.util.Testing;
import io.debezium.util.Throwables;
@ -143,7 +133,7 @@ public void beforeEach() throws Exception {
@Test
public void verifyNonAsciiContentHandledCorrectly() throws Exception {
appendLinesToSource("Ñ ñ", NUMBER_OF_LINES);
linesAdded += DebeziumEngineTestUtils.appendLinesToSource(inputFile, "Ñ ñ", NUMBER_OF_LINES, linesAdded);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
@ -829,35 +819,13 @@ public void connectorStarted() {
}
protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linesAdded + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
protected void appendLinesToSource(String linePrefix, int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) {
lines[i] = generateLine(linePrefix, linesAdded + i + 1);
}
java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), UTF8, StandardOpenOption.APPEND);
linesAdded += numberOfLines;
}
protected String generateLine(int lineNumber) {
return generateLine("Generated line number ", lineNumber);
}
protected String generateLine(String linePrefix, int lineNumber) {
return linePrefix + lineNumber;
linesAdded += DebeziumEngineTestUtils.appendLinesToSource(inputFile, numberOfLines, linesAdded);
}
protected void consumeLines(int numberOfLines) throws InterruptedException {
consumeRecords(numberOfLines, 3, record -> {
String line = record.value().toString();
assertThat(line).isEqualTo(generateLine(nextConsumedLineNumber));
assertThat(line).isEqualTo(DebeziumEngineTestUtils.generateLine(nextConsumedLineNumber));
++nextConsumedLineNumber;
},
false);
@ -890,73 +858,3 @@ public void close() {
}
}
}
class InterruptedConnector extends SimpleSourceConnector {
@Override
public Class<? extends Task> taskClass() {
return InterruptedTask.class;
}
}
class InterruptedTask extends SimpleSourceConnector.SimpleConnectorTask {
@Override
public List<SourceRecord> poll() throws InterruptedException {
throw new InterruptedException();
}
}
class InterruptingOffsetStore implements OffsetBackingStore {
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> collection) {
// called by the offset reader. return null for no offsets stored.
return new CompletableFuture<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> get(long timeout, TimeUnit unit) {
return new HashMap<ByteBuffer, ByteBuffer>();
}
@Override
public Map<ByteBuffer, ByteBuffer> get() {
return new HashMap<ByteBuffer, ByteBuffer>();
}
};
}
/**
* Implementation that throws InterruptedException when offset commits are called.
*/
@Override
public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
return new CompletableFuture<Void>() {
@Override
public Void get() throws InterruptedException {
throw new InterruptedException();
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException {
throw new InterruptedException();
}
};
}
@Override
public void configure(WorkerConfig workerConfig) {
}
@Override
public Set<Map<String, Object>> connectorPartitions(String connectorName) {
return null;
}
}