DBZ-1930 Support for SMTs in embedded engine

This commit is contained in:
Jiri Pechanec 2020-04-02 18:44:55 +02:00 committed by Gunnar Morling
parent 65c8564d66
commit 2c1e51b657
3 changed files with 169 additions and 0 deletions

View File

@ -21,8 +21,12 @@
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.connect.connector.ConnectorContext; import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.errors.RetriableException;
@ -176,6 +180,17 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.") .withDescription("The Converter class that should be used to serialize and deserialize value data for offsets.")
.withDefault(JsonConverter.class.getName()); .withDefault(JsonConverter.class.getName());
/**
* A list of SMTs to be applied on the messages generated by the engine..
*/
public static final Field TRANSFORMS = Field.create("transforms")
.withDisplayName("List of prefixes defining transformations.")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of single message transformations applied on the messages. "
+ "The transforms are defined using '<transform.prefix>.type' config option and configured using options '<transform.prefix>.<option>'");
/** /**
* The array of fields that are required by each connectors. * The array of fields that are required by each connectors.
*/ */
@ -532,6 +547,7 @@ public static Builder create() {
private OffsetCommitPolicy offsetCommitPolicy; private OffsetCommitPolicy offsetCommitPolicy;
private SourceTask task; private SourceTask task;
private final Transformations transformations;
private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler, private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock clock, DebeziumEngine.ChangeConsumer<SourceRecord> handler,
DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback, DebeziumEngine.CompletionCallback completionCallback, DebeziumEngine.ConnectorCallback connectorCallback,
@ -563,6 +579,8 @@ private EmbeddedEngine(Configuration config, ClassLoader classLoader, Clock cloc
} }
valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false); valueConverter.configure(valueConverterConfig.subset(INTERNAL_VALUE_CONVERTER_CLASS.name() + ".", true).asMap(), false);
transformations = new Transformations(config);
// Create the worker config, adding extra fields that are required for validation of a worker config // Create the worker config, adding extra fields that are required for validation of a worker config
// but that are not used within the embedded engine (since the source records are never serialized) ... // but that are not used within the embedded engine (since the source records are never serialized) ...
Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS); Map<String, String> embeddedConfig = config.asMap(ALL_FIELDS);
@ -763,6 +781,11 @@ public Map<String, String> configs() {
try { try {
if (changeRecords != null && !changeRecords.isEmpty()) { if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} records from the task", changeRecords.size()); logger.debug("Received {} records from the task", changeRecords.size());
changeRecords = changeRecords.stream().map(transformations::transform).filter(x -> x != null).collect(Collectors.toList());
}
if (changeRecords != null && !changeRecords.isEmpty()) {
logger.debug("Received {} transformed records from the task", changeRecords.size());
try { try {
handler.handleBatch(changeRecords, committer); handler.handleBatch(changeRecords, committer);

View File

@ -0,0 +1,84 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
/**
* Composite class representing transformation chain.
*
* @author Jiri Pechanec
*
*/
public class Transformations implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Transformations.class);
private static final String TYPE_SUFFIX = ".type";
private final Configuration config;
private final List<Transformation<SourceRecord>> transforms = new ArrayList<>();
public Transformations(Configuration config) {
this.config = config;
final String transformationList = config.getString(EmbeddedEngine.TRANSFORMS);
if (transformationList == null) {
return;
}
for (String transfName : transformationList.split(",")) {
transfName = transfName.trim();
final Transformation<SourceRecord> transformation = getTransformation(transfName);
transformation.configure(config.subset(transformationConfigNamespace(transfName), true).asMap());
transforms.add(transformation);
}
}
private String transformationConfigNamespace(final String name) {
return EmbeddedEngine.TRANSFORMS.name() + "." + name;
}
@SuppressWarnings("unchecked")
private Transformation<SourceRecord> getTransformation(String name) {
final Transformation<SourceRecord> transformation = config.getInstance(EmbeddedEngine.TRANSFORMS.name() + "." + name + TYPE_SUFFIX, Transformation.class);
if (transformation == null) {
throw new DebeziumException("Cannot instatiate transformation '" + name + "'");
}
return transformation;
}
public SourceRecord transform(SourceRecord record) {
for (Transformation<SourceRecord> t : transforms) {
record = t.apply(record);
if (record == null) {
break;
}
}
return record;
}
@Override
public void close() throws IOException {
for (Transformation<SourceRecord> t : transforms) {
try {
t.close();
}
catch (Exception e) {
LOGGER.warn("Error while closing transformation", e);
}
}
}
}

View File

@ -208,6 +208,68 @@ public void shouldRunDebeziumEngine() throws Exception {
stopConnector(); stopConnector();
} }
@Test
public void shouldExecuteSmt() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);
final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");
props.setProperty("transforms", "router");
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
props.setProperty("transforms.router.regex", "(.*)");
props.setProperty("transforms.router.replacement", "trf$1");
CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
// create an engine with our custom class
final DebeziumEngine<SourceRecord> engine = DebeziumEngine.create(Connect.class)
.using(props)
.notifying((records, committer) -> {
assertThat(records.size()).isGreaterThanOrEqualTo(NUMBER_OF_LINES);
records.forEach(r -> assertThat(r.topic()).isEqualTo("trftopicX"));
Integer groupCount = records.size() / NUMBER_OF_LINES;
for (SourceRecord r : records) {
committer.markProcessed(r);
}
committer.markBatchFinished();
firstLatch.countDown();
for (int i = 0; i < groupCount; i++) {
allLatch.countDown();
}
})
.using(this.getClass().getClassLoader())
.build();
ExecutorService exec = Executors.newFixedThreadPool(1);
exec.execute(() -> {
LoggingContext.forConnector(getClass().getSimpleName(), "", "engine");
engine.run();
});
firstLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(firstLatch.getCount()).isEqualTo(0);
for (int i = 0; i < 5; i++) {
// Add a few more lines, and then verify they are consumed ...
appendLinesToSource(NUMBER_OF_LINES);
Thread.sleep(10);
}
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);
// Stop the connector ...
stopConnector();
}
protected void appendLinesToSource(int numberOfLines) throws IOException { protected void appendLinesToSource(int numberOfLines) throws IOException {
CharSequence[] lines = new CharSequence[numberOfLines]; CharSequence[] lines = new CharSequence[numberOfLines];
for (int i = 0; i != numberOfLines; ++i) { for (int i = 0; i != numberOfLines; ++i) {