DBZ-5530 Add support for predicates in debezium embedded engine.

This commit is contained in:
Jeremy Ford 2022-10-16 16:37:09 -04:00 committed by Jiri Pechanec
parent 370a6db733
commit 9072ea404f
6 changed files with 310 additions and 7 deletions

View File

@ -180,6 +180,17 @@ public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {
.withDefault(io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName()) .withDefault(io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName())
.withValidation(Field::isClassName); .withValidation(Field::isClassName);
/**
* A list of Predicates that can be assigned to transformations.
*/
public static final Field PREDICATES = Field.create("predicates")
.withDisplayName("List of prefixes defining predicates.")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.LOW)
.withDescription("Optional list of predicates that can be assigned to transformations. "
+ "The predicates are defined using '<predicate.prefix>.type' config option and configured using options '<predicate.prefix>.<option>'");
/** /**
* A list of SMTs to be applied on the messages generated by the engine. * A list of SMTs to be applied on the messages generated by the engine.
*/ */

View File

@ -0,0 +1,87 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
/**
* Composite class representing predicate definitions.
*
* @author Jeremy Ford
*/
public class Predicates implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(Predicates.class);
private static final String TYPE_SUFFIX = ".type";
private final Map<String, Predicate<SourceRecord>> predicates = new HashMap<>();
public Predicates(Configuration config) {
final String predicateList = config.getString(EmbeddedEngine.PREDICATES);
if (predicateList == null) {
return;
}
for (String predicateName : predicateList.split(",")) {
predicateName = predicateName.trim();
final Predicate<SourceRecord> predicate = createPredicate(config, predicateName);
predicates.put(predicateName, predicate);
}
}
public Predicate<SourceRecord> getPredicate(String name) {
return this.predicates.get(name);
}
@SuppressWarnings("unchecked")
private static Predicate<SourceRecord> createPredicate(Configuration config, String name) {
Predicate<SourceRecord> predicate;
String predicatePrefix = predicateConfigNamespace(name);
try {
predicate = config.getInstance(predicatePrefix + TYPE_SUFFIX, Predicate.class);
}
catch (Exception e) {
throw new DebeziumException("Error while instantiating predicate '" + name + "'", e);
}
if (predicate == null) {
throw new DebeziumException("Cannot instantiate predicate '" + name + "'");
}
predicate.configure(config.subset(predicatePrefix, true).asMap());
return predicate;
}
private static String predicateConfigNamespace(final String name) {
return EmbeddedEngine.PREDICATES.name() + "." + name;
}
@Override
public void close() throws IOException {
for (Predicate<SourceRecord> p : predicates.values()) {
try {
p.close();
}
catch (Exception e) {
LOGGER.warn("Error while closing predicate", e);
}
}
}
}

View File

@ -9,20 +9,23 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import io.debezium.DebeziumException; import io.debezium.DebeziumException;
import io.debezium.annotation.VisibleForTesting;
import io.debezium.config.Configuration; import io.debezium.config.Configuration;
/** /**
* Composite class representing transformation chain. * Composite class representing transformation chain.
* *
* @author Jiri Pechanec * @author Jiri Pechanec
*
*/ */
public class Transformations implements Closeable { public class Transformations implements Closeable {
@ -30,11 +33,16 @@ public class Transformations implements Closeable {
private static final String TYPE_SUFFIX = ".type"; private static final String TYPE_SUFFIX = ".type";
private static final String PREDICATE_SUFFIX = ".predicate";
private static final String NEGATE_SUFFIX = ".negate";
private final Configuration config; private final Configuration config;
private final List<Transformation<SourceRecord>> transforms = new ArrayList<>(); private final List<Transformation<SourceRecord>> transforms = new ArrayList<>();
private final Predicates predicates;
public Transformations(Configuration config) { public Transformations(Configuration config) {
this.config = config; this.config = config;
this.predicates = new Predicates(config);
final String transformationList = config.getString(EmbeddedEngine.TRANSFORMS); final String transformationList = config.getString(EmbeddedEngine.TRANSFORMS);
if (transformationList == null) { if (transformationList == null) {
return; return;
@ -42,28 +50,38 @@ public Transformations(Configuration config) {
for (String transfName : transformationList.split(",")) { for (String transfName : transformationList.split(",")) {
transfName = transfName.trim(); transfName = transfName.trim();
final Transformation<SourceRecord> transformation = getTransformation(transfName); final Transformation<SourceRecord> transformation = getTransformation(transfName);
transformation.configure(config.subset(transformationConfigNamespace(transfName), true).asMap());
transforms.add(transformation); transforms.add(transformation);
} }
} }
private String transformationConfigNamespace(final String name) { private static String transformationConfigNamespace(final String name) {
return EmbeddedEngine.TRANSFORMS.name() + "." + name; return EmbeddedEngine.TRANSFORMS.name() + "." + name;
} }
@VisibleForTesting
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Transformation<SourceRecord> getTransformation(String name) { Transformation<SourceRecord> getTransformation(String name) {
Transformation<SourceRecord> transformation = null; Transformation<SourceRecord> transformation;
String transformPrefix = transformationConfigNamespace(name);
try { try {
transformation = config.getInstance(EmbeddedEngine.TRANSFORMS.name() + "." + name + TYPE_SUFFIX, Transformation.class); transformation = config.getInstance(transformPrefix + TYPE_SUFFIX, Transformation.class);
} }
catch (Exception e) { catch (Exception e) {
throw new DebeziumException("Error while instantiating transformation '" + name + "'", e); throw new DebeziumException("Error while instantiating transformation '" + name + "'", e);
} }
if (transformation == null) { if (transformation == null) {
throw new DebeziumException("Cannot instatiate transformation '" + name + "'"); throw new DebeziumException("Cannot instantiate transformation '" + name + "'");
}
transformation.configure(config.subset(transformPrefix, true).asMap());
String predicateName = config.getString(transformPrefix + PREDICATE_SUFFIX);
if (predicateName != null) {
Boolean negate = config.getBoolean(transformPrefix + NEGATE_SUFFIX);
Predicate<SourceRecord> predicate = this.predicates.getPredicate(predicateName);
transformation = createPredicateTransformation(negate != null && negate, predicate, transformation);
} }
return transformation; return transformation;
@ -79,8 +97,44 @@ record = t.apply(record);
return record; return record;
} }
private static Transformation<SourceRecord> createPredicateTransformation(
boolean negate, Predicate<SourceRecord> predicate,
Transformation<SourceRecord> transformation) {
return new Transformation<>() {
@Override
public SourceRecord apply(SourceRecord sourceRecord) {
if (negate ^ predicate.test(sourceRecord)) {
return transformation.apply(sourceRecord);
}
return sourceRecord;
}
@Override
public ConfigDef config() {
return null;
}
@Override
public void close() {
// predicate will be closed via the Predicates class
try {
transformation.close();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void configure(Map<String, ?> map) {
}
};
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
for (Transformation<SourceRecord> t : transforms) { for (Transformation<SourceRecord> t : transforms) {
try { try {
t.close(); t.close();
@ -89,5 +143,7 @@ public void close() throws IOException {
LOGGER.warn("Error while closing transformation", e); LOGGER.warn("Error while closing transformation", e);
} }
} }
this.predicates.close();
} }
} }

View File

@ -37,6 +37,7 @@
import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.SafeObjectInputStream; import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.fest.assertions.Assertions; import org.fest.assertions.Assertions;
@ -96,6 +97,28 @@ public void close() {
} }
} }
public static class FilterPredicate implements Predicate<SourceRecord> {
@Override
public ConfigDef config() {
return null;
}
@Override
public boolean test(SourceRecord sourceRecord) {
return sourceRecord.value().equals("Generated line number 1");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
@Before @Before
public void beforeEach() throws Exception { public void beforeEach() throws Exception {
nextConsumedLineNumber = 1; nextConsumedLineNumber = 1;
@ -472,11 +495,14 @@ public void shouldExecuteSmt() throws Exception {
props.setProperty("offset.flush.interval.ms", "0"); props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString()); props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX"); props.setProperty("topic", "topicX");
props.setProperty("predicates", "filter");
props.setProperty("predicates.filter.type", FilterPredicate.class.getName());
props.setProperty("transforms", "filter, router"); props.setProperty("transforms", "filter, router");
props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter"); props.setProperty("transforms.router.type", "org.apache.kafka.connect.transforms.RegexRouter");
props.setProperty("transforms.router.regex", "(.*)"); props.setProperty("transforms.router.regex", "(.*)");
props.setProperty("transforms.router.replacement", "trf$1"); props.setProperty("transforms.router.replacement", "trf$1");
props.setProperty("transforms.filter.type", "io.debezium.embedded.EmbeddedEngineTest$FilterTransform"); props.setProperty("transforms.filter.type", "io.debezium.embedded.EmbeddedEngineTest$FilterTransform");
props.setProperty("transforms.filter.predicate", "filter");
CountDownLatch firstLatch = new CountDownLatch(1); CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(5); CountDownLatch allLatch = new CountDownLatch(5);

View File

@ -0,0 +1,51 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Properties;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.transforms.predicates.TopicNameMatches;
import org.junit.Test;
import io.debezium.config.Configuration;
/**
* @author Jeremy Ford
*/
public class PredicatesTest {
@Test
public void test() throws IOException {
Properties properties = new Properties();
properties.setProperty("predicates", "a,b");
properties.setProperty("predicates.a.type", TopicNameMatches.class.getName());
properties.setProperty("predicates.a.pattern", "a-.*");
properties.setProperty("predicates.b.type", HasHeaderKey.class.getName());
properties.setProperty("predicates.b.name", "bob");
Configuration configuration = Configuration.from(properties);
try (Predicates predicates = new Predicates(configuration)) {
Predicate<SourceRecord> a = predicates.getPredicate("a");
assertNotNull(a);
assertTrue(a instanceof TopicNameMatches);
Predicate<SourceRecord> b = predicates.getPredicate("b");
assertNotNull(b);
assertTrue(b instanceof HasHeaderKey);
}
}
}

View File

@ -0,0 +1,72 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.embedded;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.InsertHeader;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.HasHeaderKey;
import org.junit.Test;
import io.debezium.config.Configuration;
/**
* @author Jeremy Ford
*/
public class TransformationsTest {
@Test
public void test() throws IOException {
Properties properties = new Properties();
properties.setProperty("predicates", "hasheader");
properties.setProperty("predicates.hasheader.type", HasHeaderKey.class.getName());
properties.setProperty("predicates.hasheader.name", "existingHeader");
properties.setProperty("transforms", "a,b");
properties.setProperty("transforms.a.type", InsertHeader.class.getName());
properties.setProperty("transforms.a.header", "h1");
properties.setProperty("transforms.a.value.literal", "a");
properties.setProperty("transforms.a.predicate", "hasheader");
properties.setProperty("transforms.b.type", InsertHeader.class.getName());
properties.setProperty("transforms.b.header", "h1");
properties.setProperty("transforms.b.value.literal", "b");
properties.setProperty("transforms.b.predicate", "hasheader");
properties.setProperty("transforms.b.negate", "true");
properties.setProperty("transforms.b.predicate", "hasheader");
Configuration configuration = Configuration.from(properties);
SourceRecord updated;
try (Transformations transformations = new Transformations(configuration)) {
Transformation<SourceRecord> a = transformations.getTransformation("a");
assertNotNull(a);
updated = a.apply(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "t1", 1, null, "key", null, "value", System.currentTimeMillis(),
new ConnectHeaders().addString("existingHeader", "someValue")));
assertNotNull(updated.headers().lastWithName("h1"));
assertEquals("a", updated.headers().lastWithName("h1").value());
// record does not have header, but transformation will still apply due to negation
Transformation<SourceRecord> b = transformations.getTransformation("b");
updated = b.apply(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "t1", 1, null, "key", null, "value", System.currentTimeMillis(),
new ConnectHeaders()));
assertNotNull(updated.headers().lastWithName("h1"));
assertEquals("b", updated.headers().lastWithName("h1").value());
}
}
}